From 155a27d49d095cd0dc3a14029788788d7a66e742 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Thu, 27 Apr 2023 17:57:10 +0200 Subject: [PATCH 01/11] start implementing ovserver --- src/gateway.rs | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index d68be22..c7988a1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,4 +1,41 @@ #[derive(Debug)] -pub struct Gateway { - url: String, +pub struct Gateway {} + +pub trait Observer { + fn update(&self, data: &str); +} + +pub struct GatewayEvent<'a> { + observers: Vec<&'a dyn Observer>, + test_content: String, +} + +impl<'a> GatewayEvent<'a> { + pub fn new(test_content: String) -> Self { + Self { + observers: Vec::new(), + test_content, + } + } + + pub fn subscribe(&mut self, observable: &'a dyn Observer) { + self.observers.push(observable) + } + + pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { + if let Some(index) = self.observers.iter().position(|&o| o == observable) { + self.observers.remove(index); + } + } + + pub fn update_data(&mut self, test_content: String) { + self.test_content = test_content; + self.notify(); + } + + pub fn notify(&self) { + for observer in &self.observers { + observer.update(&self.test_content); + } + } } From 77ff7d6510b5b5b4dcb6c4501b3648dcb862be0f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Thu, 27 Apr 2023 22:29:07 +0200 Subject: [PATCH 02/11] Update observer implementation --- src/gateway.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index c7988a1..7206c4e 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -7,33 +7,43 @@ pub trait Observer { pub struct GatewayEvent<'a> { observers: Vec<&'a dyn Observer>, - test_content: String, + pub test_content: String, + pub is_observed: bool, } impl<'a> GatewayEvent<'a> { - pub fn new(test_content: String) -> Self { + fn new(test_content: String) -> Self { Self { + is_observed: false, observers: Vec::new(), test_content, } } + pub fn is_observed(&self) -> bool { + self.is_observed + } + pub fn subscribe(&mut self, observable: &'a dyn Observer) { + if self.is_observed { + return; + } + self.is_observed = true; self.observers.push(observable) } pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { - if let Some(index) = self.observers.iter().position(|&o| o == observable) { - self.observers.remove(index); - } + self.observers.pop(); + self.is_observed = false; + return; } - pub fn update_data(&mut self, test_content: String) { + fn update_data(&mut self, test_content: String) { self.test_content = test_content; self.notify(); } - pub fn notify(&self) { + fn notify(&self) { for observer in &self.observers { observer.update(&self.test_content); } From 656acad356ca8fe327223f1f76225a790f1d9446 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Thu, 27 Apr 2023 22:38:41 +0200 Subject: [PATCH 03/11] Indicate that method call can fail --- src/gateway.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 7206c4e..9a05fec 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,3 +1,5 @@ +use crate::errors::ObserverError; + #[derive(Debug)] pub struct Gateway {} @@ -24,12 +26,13 @@ impl<'a> GatewayEvent<'a> { self.is_observed } - pub fn subscribe(&mut self, observable: &'a dyn Observer) { + pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { if self.is_observed { - return; + return Some(ObserverError::AlreadySubscribedError); } self.is_observed = true; - self.observers.push(observable) + self.observers.push(observable); + None } pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { From 8f97b48a096fada8a765f1c166e4a75bf5ac3984 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Thu, 27 Apr 2023 22:38:57 +0200 Subject: [PATCH 04/11] Add error type for Observers --- src/errors.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/errors.rs b/src/errors.rs index 58dc856..f74c7cd 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -19,3 +19,9 @@ custom_error! { InvalidFormBodyError{error_type: String, error:String} = "The server responded with: {error_type}: {error}", RateLimited = "Ratelimited.", } + +custom_error! { + #[derive(PartialEq, Eq)] + pub ObserverError + AlreadySubscribedError = "Each event can only be subscribed to once." +} From 50b6029f54468719171935a8a100995f499660fd Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 12:31:59 +0200 Subject: [PATCH 05/11] Implement Observer for WebSocketEvents --- src/api/types.rs | 36 ++++++++++++++++++---- src/gateway.rs | 77 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 92 insertions(+), 21 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index c50520f..393ed01 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -2,7 +2,7 @@ To learn more about the types implemented here, please visit https://discord.com/developers/docs . I do not feel like re-documenting all of this, as everything is already perfectly explained there. - */ +*/ use std::fmt; @@ -10,6 +10,8 @@ use serde::{Deserialize, Serialize}; use crate::{api::limits::Limits, URLBundle}; +pub trait WebSocketEvent {} + #[derive(Debug, Serialize, Deserialize)] pub struct LoginResult { token: String, @@ -246,6 +248,8 @@ struct MessageCreate { mentions: Vec<(UserObject, GuildMember)>, // Not sure if this is correct: https://discord.com/developers/docs/topics/gateway-events#message-create } +impl WebSocketEvent for MessageCreate {} + #[derive(Debug, Serialize, Deserialize)] struct PartialMessage { id: Option, @@ -293,6 +297,8 @@ struct MessageUpdate { mentions: Vec<(UserObject, GuildMember)>, // Not sure if this is correct: https://discord.com/developers/docs/topics/gateway-events#message-create } +impl WebSocketEvent for MessageUpdate {} + #[derive(Debug, Serialize, Deserialize)] struct MessageDelete { id: String, @@ -300,6 +306,8 @@ struct MessageDelete { guild_id: Option, } +impl WebSocketEvent for MessageDelete {} + #[derive(Debug, Serialize, Deserialize)] struct MessageDeleteBulk { ids: Vec, @@ -307,6 +315,8 @@ struct MessageDeleteBulk { guild_id: Option, } +impl WebSocketEvent for MessageDeleteBulk {} + #[derive(Debug, Serialize, Deserialize)] struct MessageReactionAdd { user_id: String, @@ -317,6 +327,8 @@ struct MessageReactionAdd { emoji: Emoji, } +impl WebSocketEvent for MessageReactionAdd {} + #[derive(Debug, Serialize, Deserialize)] struct MessageReactionRemove { user_id: String, @@ -326,6 +338,8 @@ struct MessageReactionRemove { emoji: Emoji, } +impl WebSocketEvent for MessageReactionRemove {} + #[derive(Debug, Serialize, Deserialize)] struct MessageReactionRemoveAll { channel_id: String, @@ -333,6 +347,8 @@ struct MessageReactionRemoveAll { guild_id: Option, } +impl WebSocketEvent for MessageReactionRemoveAll {} + #[derive(Debug, Serialize, Deserialize)] struct MessageReactionRemoveEmoji { channel_id: String, @@ -341,6 +357,8 @@ struct MessageReactionRemoveEmoji { emoji: Emoji, } +impl WebSocketEvent for MessageReactionRemoveEmoji {} + #[derive(Debug, Serialize, Deserialize)] struct ChannelMention { id: String, @@ -689,6 +707,8 @@ struct TypingStartEvent { member: Option, } +impl WebSocketEvent for TypingStartEvent {} + #[derive(Debug, Deserialize, Serialize)] struct GatewayIdentifyPayload { token: String, @@ -700,6 +720,8 @@ struct GatewayIdentifyPayload { intents: i32, } +impl WebSocketEvent for GatewayIdentifyPayload {} + #[derive(Debug, Deserialize, Serialize)] struct GatewayIdentifyConnectionProps { os: String, @@ -715,6 +737,8 @@ struct PresenceUpdate { afk: bool, } +impl WebSocketEvent for PresenceUpdate {} + #[derive(Debug, Deserialize, Serialize)] struct Activity { name: String, @@ -770,8 +794,10 @@ struct ActivityButton { } #[derive(Debug, Deserialize, Serialize)] -struct GatewayResume { - token: String, - session_id: String, - seq: String, +pub(crate) struct GatewayResume { + pub(crate) token: String, + pub(crate) session_id: String, + pub(crate) seq: String, } + +impl WebSocketEvent for GatewayResume {} diff --git a/src/gateway.rs b/src/gateway.rs index 9a05fec..19a6a48 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,24 +1,24 @@ -use crate::errors::ObserverError; +use crate::{api::WebSocketEvent, errors::ObserverError}; #[derive(Debug)] pub struct Gateway {} -pub trait Observer { - fn update(&self, data: &str); +pub trait Observer { + fn update(&self, data: &T); } -pub struct GatewayEvent<'a> { - observers: Vec<&'a dyn Observer>, - pub test_content: String, +pub struct GatewayEvent<'a, T: WebSocketEvent> { + observers: Vec<&'a dyn Observer>, + pub event_data: T, pub is_observed: bool, } -impl<'a> GatewayEvent<'a> { - fn new(test_content: String) -> Self { +impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { + fn new(event_data: T) -> Self { Self { is_observed: false, observers: Vec::new(), - test_content, + event_data, } } @@ -26,7 +26,7 @@ impl<'a> GatewayEvent<'a> { self.is_observed } - pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { + pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { if self.is_observed { return Some(ObserverError::AlreadySubscribedError); } @@ -35,20 +35,65 @@ impl<'a> GatewayEvent<'a> { None } - pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { - self.observers.pop(); - self.is_observed = false; + pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { + // .retain()'s closure retains only those elements of the vector, which have a different + // pointer value than observable. + self.observers.retain(|obs| !std::ptr::eq(*obs, observable)); + self.is_observed = !self.observers.is_empty(); return; } - fn update_data(&mut self, test_content: String) { - self.test_content = test_content; + fn update_data(&mut self, new_event_data: T) { + self.event_data = new_event_data; self.notify(); } fn notify(&self) { for observer in &self.observers { - observer.update(&self.test_content); + observer.update(&self.event_data); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::api::types::GatewayResume; + + struct Consumer; + impl Observer for Consumer { + fn update(&self, data: &GatewayResume) { + println!("{}", data.token) + } + } + + #[test] + fn test_observer_behaviour() { + let mut event = GatewayEvent::new(GatewayResume { + token: "start".to_string(), + session_id: "start".to_string(), + seq: "start".to_string(), + }); + + let new_data = GatewayResume { + token: "token_3276ha37am3".to_string(), + session_id: "89346671230".to_string(), + seq: "3".to_string(), + }; + + let consumer = Consumer; + + event.subscribe(&consumer); + + event.notify(); + + event.update_data(new_data); + + let second_consumer = Consumer; + + match event.subscribe(&second_consumer) { + None => return, + Some(err) => println!("You cannot subscribe twice: {}", err), } } } From 29a7dba4392645a41f5a96614bba50e4a880e82c Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 12:39:58 +0200 Subject: [PATCH 06/11] renamed mod to example --- src/gateway.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 19a6a48..186de02 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -56,7 +56,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { } #[cfg(test)] -mod test { +mod example { use super::*; use crate::api::types::GatewayResume; @@ -92,7 +92,7 @@ mod test { let second_consumer = Consumer; match event.subscribe(&second_consumer) { - None => return, + None => assert!(false), Some(err) => println!("You cannot subscribe twice: {}", err), } } From 530c0deb368985d8c07f7976bd0c661e78ab5a9f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 13:40:29 +0200 Subject: [PATCH 07/11] Add documentation comments --- src/gateway.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index 186de02..558e66f 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,12 +1,23 @@ use crate::{api::WebSocketEvent, errors::ObserverError}; #[derive(Debug)] +/** +Represents a Gateway connection. + */ pub struct Gateway {} +/** +Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to +an Observable. The Observer is notified when the Observable's data changes. +In this case, the Observable is a GatewayEvent, which is a wrapper around a WebSocketEvent. + */ pub trait Observer { fn update(&self, data: &T); } +/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a + * change in the WebSocketEvent. + */ pub struct GatewayEvent<'a, T: WebSocketEvent> { observers: Vec<&'a dyn Observer>, pub event_data: T, @@ -22,10 +33,20 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { } } + /** + Returns true if the GatewayEvent is observed by at least one Observer. + */ pub fn is_observed(&self) -> bool { self.is_observed } + /** + Subscribes an Observer to the GatewayEvent. Returns an error if the GatewayEvent is already + observed. + # Errors + Returns an error if the GatewayEvent is already observed. + Error type: [`ObserverError::AlreadySubscribedError`] + */ pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { if self.is_observed { return Some(ObserverError::AlreadySubscribedError); @@ -35,6 +56,9 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { None } + /** + Unsubscribes an Observer from the GatewayEvent. + */ pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { // .retain()'s closure retains only those elements of the vector, which have a different // pointer value than observable. @@ -43,11 +67,17 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { return; } + /** + Updates the GatewayEvent's data and notifies the observers. + */ fn update_data(&mut self, new_event_data: T) { self.event_data = new_event_data; self.notify(); } + /** + Notifies the observers of the GatewayEvent. + */ fn notify(&self) { for observer in &self.observers { observer.update(&self.event_data); From 82f1b3dcc14ff0301300a5b6ca81aa791fe9e2df Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 18:18:32 +0200 Subject: [PATCH 08/11] Add Default derives to types --- src/api/types.rs | 74 +++++++++++++++++++++---------------------- src/gateway.rs | 82 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 106 insertions(+), 50 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 393ed01..0ed1755 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -134,7 +134,7 @@ pub struct Error { pub code: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] pub struct UserObject { id: String, username: String, @@ -203,8 +203,8 @@ impl User { } } -#[derive(Debug, Serialize, Deserialize)] -struct Message { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct Message { id: String, channel_id: String, author: UserObject, @@ -239,8 +239,8 @@ struct Message { role_subscription_data: Option, } -#[derive(Debug, Serialize, Deserialize)] -struct MessageCreate { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageCreate { #[serde(flatten)] message: Message, guild_id: Option, @@ -250,7 +250,7 @@ struct MessageCreate { impl WebSocketEvent for MessageCreate {} -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Default)] struct PartialMessage { id: Option, channel_id: Option, @@ -288,8 +288,8 @@ struct PartialMessage { member: Option, } -#[derive(Debug, Serialize, Deserialize)] -struct MessageUpdate { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageUpdate { #[serde(flatten)] message: PartialMessage, guild_id: Option, @@ -299,8 +299,8 @@ struct MessageUpdate { impl WebSocketEvent for MessageUpdate {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageDelete { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageDelete { id: String, channel_id: String, guild_id: Option, @@ -308,8 +308,8 @@ struct MessageDelete { impl WebSocketEvent for MessageDelete {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageDeleteBulk { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageDeleteBulk { ids: Vec, channel_id: String, guild_id: Option, @@ -317,8 +317,8 @@ struct MessageDeleteBulk { impl WebSocketEvent for MessageDeleteBulk {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageReactionAdd { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageReactionAdd { user_id: String, channel_id: String, message_id: String, @@ -329,8 +329,8 @@ struct MessageReactionAdd { impl WebSocketEvent for MessageReactionAdd {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageReactionRemove { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageReactionRemove { user_id: String, channel_id: String, message_id: String, @@ -340,8 +340,8 @@ struct MessageReactionRemove { impl WebSocketEvent for MessageReactionRemove {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageReactionRemoveAll { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageReactionRemoveAll { channel_id: String, message_id: String, guild_id: Option, @@ -349,8 +349,8 @@ struct MessageReactionRemoveAll { impl WebSocketEvent for MessageReactionRemoveAll {} -#[derive(Debug, Serialize, Deserialize)] -struct MessageReactionRemoveEmoji { +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct MessageReactionRemoveEmoji { channel_id: String, message_id: String, guild_id: Option, @@ -463,7 +463,7 @@ struct Reaction { emoji: Emoji, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Default)] struct Emoji { id: Option, name: Option, @@ -698,8 +698,8 @@ struct RoleSubscriptionData { is_renewal: bool, } -#[derive(Debug, Deserialize, Serialize)] -struct TypingStartEvent { +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct TypingStartEvent { channel_id: String, guild_id: Option, user_id: String, @@ -709,8 +709,8 @@ struct TypingStartEvent { impl WebSocketEvent for TypingStartEvent {} -#[derive(Debug, Deserialize, Serialize)] -struct GatewayIdentifyPayload { +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct GatewayIdentifyPayload { token: String, properties: GatewayIdentifyConnectionProps, compress: Option, @@ -722,15 +722,15 @@ struct GatewayIdentifyPayload { impl WebSocketEvent for GatewayIdentifyPayload {} -#[derive(Debug, Deserialize, Serialize)] -struct GatewayIdentifyConnectionProps { - os: String, - browser: String, - device: String, +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct GatewayIdentifyConnectionProps { + pub os: String, + pub browser: String, + pub device: String, } -#[derive(Debug, Deserialize, Serialize)] -struct PresenceUpdate { +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct PresenceUpdate { since: Option, activities: Vec, status: String, @@ -793,11 +793,11 @@ struct ActivityButton { url: String, } -#[derive(Debug, Deserialize, Serialize)] -pub(crate) struct GatewayResume { - pub(crate) token: String, - pub(crate) session_id: String, - pub(crate) seq: String, +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct GatewayResume { + pub token: String, + pub session_id: String, + pub seq: String, } impl WebSocketEvent for GatewayResume {} diff --git a/src/gateway.rs b/src/gateway.rs index 558e66f..84a1f96 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,23 +1,50 @@ -use crate::{api::WebSocketEvent, errors::ObserverError}; +use crate::api::types::*; +use crate::api::WebSocketEvent; +use crate::errors::ObserverError; +use crate::gateway::events::Events; +use crate::URLBundle; +use reqwest::{Client, Url}; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::error::UrlError; +use tokio_tungstenite::tungstenite::Error; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -#[derive(Debug)] /** -Represents a Gateway connection. - */ -pub struct Gateway {} +Represents a Gateway connection. A Gateway connection will create observable +[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently +implemented [Types] with the trait [`WebSocketEvent`] +*/ +pub struct Gateway<'a> { + pub url: String, + pub events: Events<'a>, + stream: WebSocketStream>, +} + +impl<'a> Gateway<'a> { + pub async fn new(websocket_url: String) { + let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); + if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { + return Err(Error::Url(UrlError::UnsupportedUrlScheme)); + } + } +} /** Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to an Observable. The Observer is notified when the Observable's data changes. -In this case, the Observable is a GatewayEvent, which is a wrapper around a WebSocketEvent. +In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. */ pub trait Observer { fn update(&self, data: &T); } /** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a - * change in the WebSocketEvent. - */ +change in the WebSocketEvent. GatewayEvents are observable. +*/ + +#[derive(Default)] pub struct GatewayEvent<'a, T: WebSocketEvent> { observers: Vec<&'a dyn Observer>, pub event_data: T, @@ -35,7 +62,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { /** Returns true if the GatewayEvent is observed by at least one Observer. - */ + */ pub fn is_observed(&self) -> bool { self.is_observed } @@ -46,7 +73,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { # Errors Returns an error if the GatewayEvent is already observed. Error type: [`ObserverError::AlreadySubscribedError`] - */ + */ pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { if self.is_observed { return Some(ObserverError::AlreadySubscribedError); @@ -58,7 +85,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { /** Unsubscribes an Observer from the GatewayEvent. - */ + */ pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { // .retain()'s closure retains only those elements of the vector, which have a different // pointer value than observable. @@ -69,7 +96,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { /** Updates the GatewayEvent's data and notifies the observers. - */ + */ fn update_data(&mut self, new_event_data: T) { self.event_data = new_event_data; self.notify(); @@ -77,7 +104,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { /** Notifies the observers of the GatewayEvent. - */ + */ fn notify(&self) { for observer in &self.observers { observer.update(&self.event_data); @@ -85,6 +112,35 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { } } +mod events { + use super::*; + #[derive(Default)] + pub struct Events<'a> { + pub message: Message<'a>, + pub user: User<'a>, + pub gateway_identify_payload: GatewayEvent<'a, GatewayIdentifyPayload>, + pub gateway_resume: GatewayEvent<'a, GatewayResume>, + } + + #[derive(Default)] + pub struct Message<'a> { + pub create: GatewayEvent<'a, MessageCreate>, + pub update: GatewayEvent<'a, MessageUpdate>, + pub delete: GatewayEvent<'a, MessageDelete>, + pub delete_bulk: GatewayEvent<'a, MessageDeleteBulk>, + pub reaction_add: GatewayEvent<'a, MessageReactionAdd>, + pub reaction_remove: GatewayEvent<'a, MessageReactionRemove>, + pub reaction_remove_all: GatewayEvent<'a, MessageReactionRemoveAll>, + pub reaction_remove_emoji: GatewayEvent<'a, MessageReactionRemoveEmoji>, + } + + #[derive(Default)] + pub struct User<'a> { + pub presence_update: GatewayEvent<'a, PresenceUpdate>, + pub typing_start_event: GatewayEvent<'a, TypingStartEvent>, + } +} + #[cfg(test)] mod example { use super::*; From 37ad29f698d653eae58704543090b5390051b32d Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 20:29:40 +0200 Subject: [PATCH 09/11] Working on Gateway impl --- src/gateway.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 84a1f96..f56e0d4 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,15 +1,18 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::thread::JoinHandle; + use crate::api::types::*; use crate::api::WebSocketEvent; use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::URLBundle; -use reqwest::{Client, Url}; +use reqwest::Url; +use serde_json::to_string; use tokio::net::TcpStream; -use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; -use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; /** Represents a Gateway connection. A Gateway connection will create observable @@ -19,15 +22,30 @@ implemented [Types] with the trait [`WebSocketEvent`] pub struct Gateway<'a> { pub url: String, pub events: Events<'a>, - stream: WebSocketStream>, + socket: Arc>>>>, + thread_handle: Option>, } impl<'a> Gateway<'a> { - pub async fn new(websocket_url: String) { + pub async fn new(websocket_url: String, token: String) { let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(Error::Url(UrlError::UnsupportedUrlScheme)); + //return Err(Error::Url(UrlError::UnsupportedUrlScheme)); } + let payload = GatewayIdentifyPayload { + token: token, + properties: GatewayIdentifyConnectionProps { + os: "any".to_string(), + browser: "chorus-polyphony".to_string(), + device: "chorus-lib".to_string(), + }, + compress: Some(true), + large_threshold: None, + shard: None, + presence: None, + intents: 3276799, + }; + let payload_string = to_string(&payload).unwrap(); } } From 8cded33eda224432f4cc297be27c8733592f873e Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 20:30:03 +0200 Subject: [PATCH 10/11] Make GatewatIdentifyPayload pub --- src/api/types.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 0ed1755..43ee964 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -711,13 +711,13 @@ impl WebSocketEvent for TypingStartEvent {} #[derive(Debug, Deserialize, Serialize, Default)] pub struct GatewayIdentifyPayload { - token: String, - properties: GatewayIdentifyConnectionProps, - compress: Option, - large_threshold: Option, //default: 50 - shard: Option>, - presence: Option, - intents: i32, + pub token: String, + pub properties: GatewayIdentifyConnectionProps, + pub compress: Option, + pub large_threshold: Option, //default: 50 + pub shard: Option>, + pub presence: Option, + pub intents: i32, } impl WebSocketEvent for GatewayIdentifyPayload {} From 493471dc02b26f92fae402377fe0fd9e90100e9d Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 28 Apr 2023 20:30:31 +0200 Subject: [PATCH 11/11] add rt-multi-thread feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b441eb6..1769377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ license = "AGPL-3" edition = "2021" [dependencies] -tokio = {version = "1.27.0", features = ["rt", "macros"]} +tokio = {version = "1.27.0", features = ["rt", "macros", "rt-multi-thread"]} serde = {version = "1.0.159", features = ["derive"]} serde_json = "1.0.95" reqwest = "0.11.16"