From 284073deeb80cd151d0ea24fd35ce5d767c74fdc Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Thu, 4 May 2023 20:05:33 +0200 Subject: [PATCH 01/34] Add WebSocketConnection to Gateway --- src/gateway.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 818d14b..d3c6e5f 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -34,6 +34,7 @@ pub struct Gateway<'a> { pub url: String, pub token: String, pub events: Events<'a>, + websocket: WebSocketConnection, } impl<'a> Gateway<'a> { @@ -42,9 +43,10 @@ impl<'a> Gateway<'a> { token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { return Ok(Gateway { - url: websocket_url, + url: websocket_url.clone(), token, events: Events::default(), + websocket: WebSocketConnection::new(websocket_url).await, }); } } From 0ff3a21423f919cc9edca0d747d696c90ea751e4 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Thu, 4 May 2023 20:57:45 +0200 Subject: [PATCH 02/34] Add basic event receiving --- src/gateway.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index d3c6e5f..ed5c333 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -49,6 +49,106 @@ impl<'a> Gateway<'a> { websocket: WebSocketConnection::new(websocket_url).await, }); } + + /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers + pub async fn update_events(&mut self) { + while let Some(msg) = self.websocket.rx.lock().await.recv().await { + let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes + match gateway_payload.op { + // Dispatch + // An event was dispatched, we need to look at the gateway event name t + 0 => { + let gateway_payload_t = gateway_payload.t.unwrap(); + + // See https://discord.com/developers/docs/topics/gateway-events#receive-events + match gateway_payload_t.as_str() { + "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} + "AUTO_MODERATION_RULE_CREATE" => {} + "AUTO_MODERATION_RULE_UPDATE" => {} + "AUTO_MODERATION_RULE_DELETE" => {} + "AUTO_MODERATION_ACTION_EXECUTION" => {} + "CHANNEL_CREATE" => {} + "CHANNEL_UPDATE" => {} + "CHANNEL_DELETE" => {} + "CHANNEL_PINS_UPDATE" => {} + "THREAD_CREATE" => {} + "THREAD_UPDATE" => {} + "THREAD_DELETE" => {} + "THREAD_LIST_SYNC" => {} + "THREAD_MEMBER_UPDATE" => {} + "THREAD_MEMBERS_UPDATE" => {} + "GUILD_CREATE" => {} + "GUILD_UPDATE" => {} + "GUILD_DELETE" => {} + "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} + "GUILD_BAN_ADD" => {} + "GUILD_BAN_REMOVE" => {} + "GUILD_EMOJIS_UPDATE" => {} + "GUILD_STICKERS_UPDATE" => {} + "GUILD_INTEGRATIONS_UPDATE" => {} + "GUILD_MEMBER_ADD" => {} + "GUILD_MEMBER_REMOVE" => {} + "GUILD_MEMBER_UPDATE" => {} + "GUILD_MEMBERS_CHUNK" => {} + "GUILD_ROLE_CREATE" => {} + "GUILD_ROLE_UPDATE" => {} + "GUILD_ROLE_DELETE" => {} + "GUILD_SCHEDULED_EVENT_CREATE" => {} + "GUILD_SCHEDULED_EVENT_UPDATE" => {} + "GUILD_SCHEDULED_EVENT_DELETE" => {} + "GUILD_SCHEDULED_EVENT_USER_ADD" => {} + "GUILD_SCHEDULED_EVENT_USER_REMOVE" => {} + "INTEGRATION_CREATE" => {} + "INTEGRATION_UPDATE" => {} + "INTEGRATION_DELETE" => {} + "INTERACTION_CREATE" => {} + "INVITE_CREATE" => {} + "INVITE_DELETE" => {} + "MESSAGE_CREATE" => {} + "MESSAGE_UPDATE" => {} + "MESSAGE_DELETE" => {} + "MESSAGE_DELETE_BULK" => {} + "MESSAGE_REACTION_ADD" => {} + "MESSAGE_REACTION_REMOVE" => {} + "MESSAGE_REACTION_REMOVE_ALL" => {} + "MESSAGE_REACTION_REMOVE_EMOJI" => {} + "PRESENCE_UPDATE" => {} + "STAGE_INSTANCE_CREATE" => {} + "STAGE_INSTANCE_UPDATE" => {} + "STAGE_INSTANCE_DELETE" => {} + "TYPING_START" => {} + "USER_UPDATE" => {} + "VOICE_STATE_UPDATE" => {} + "VOICE_SERVER_UPDATE" => {} + "WEBHOOKS_UPDATE" => {} + _ => {panic!("Invalid gateway event ({})", &gateway_payload_t)} + } + } + // Heartbeat + // We received a heartbeat from the server + 1 => { + let gateway_heartbeat: GatewayHeartbeat = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + // Reconnect + 7 => {todo!()} + // Invalid Session + 9 => {todo!()} + // Hello + // Should start our heartbeat + 10 => { + let gateway_hello: GatewayHello = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + // Heartbeat ACK + 11 => { + let gateway_hb_ack: GatewayHeartbeatAck = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} + _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} + } + } + } } struct WebSocketConnection { From 28f3312cacf629ee41fed51d147dc74c94c39f98 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 15:39:11 +0200 Subject: [PATCH 03/34] No need to serialize this --- src/gateway.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index ed5c333..80cba24 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -128,9 +128,7 @@ impl<'a> Gateway<'a> { } // Heartbeat // We received a heartbeat from the server - 1 => { - let gateway_heartbeat: GatewayHeartbeat = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); - } + 1 => {} // Reconnect 7 => {todo!()} // Invalid Session @@ -138,12 +136,10 @@ impl<'a> Gateway<'a> { // Hello // Should start our heartbeat 10 => { - let gateway_hello: GatewayHello = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); } // Heartbeat ACK - 11 => { - let gateway_hb_ack: GatewayHeartbeatAck = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); - } + 11 => {} 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } From 2dda6f767e44fd1364aa74d80b2a67512f5d9cd9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 15:46:45 +0200 Subject: [PATCH 04/34] Update data for existing events --- src/gateway.rs | 50 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 80cba24..012e764 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -106,19 +106,49 @@ impl<'a> Gateway<'a> { "INTERACTION_CREATE" => {} "INVITE_CREATE" => {} "INVITE_DELETE" => {} - "MESSAGE_CREATE" => {} - "MESSAGE_UPDATE" => {} - "MESSAGE_DELETE" => {} - "MESSAGE_DELETE_BULK" => {} - "MESSAGE_REACTION_ADD" => {} - "MESSAGE_REACTION_REMOVE" => {} - "MESSAGE_REACTION_REMOVE_ALL" => {} - "MESSAGE_REACTION_REMOVE_EMOJI" => {} - "PRESENCE_UPDATE" => {} + "MESSAGE_CREATE" => { + let new_data: MessageCreate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.create.update_data(new_data); + } + "MESSAGE_UPDATE" => { + let new_data: MessageUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.update.update_data(new_data); + } + "MESSAGE_DELETE" => { + let new_data: MessageDelete = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.delete.update_data(new_data); + } + "MESSAGE_DELETE_BULK" => { + let new_data: MessageDeleteBulk = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.delete_bulk.update_data(new_data); + } + "MESSAGE_REACTION_ADD" => { + let new_data: MessageReactionAdd = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_add.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE" => { + let new_data: MessageReactionRemove = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE_ALL" => { + let new_data: MessageReactionRemoveAll = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove_all.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE_EMOJI" => { + let new_data: MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove_emoji.update_data(new_data); + } + "PRESENCE_UPDATE" => { + let new_data: PresenceUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.user.presence_update.update_data(new_data); + } "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} - "TYPING_START" => {} + "TYPING_START" => { + let new_data: TypingStartEvent = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.user.typing_start_event.update_data(new_data); + } "USER_UPDATE" => {} "VOICE_STATE_UPDATE" => {} "VOICE_SERVER_UPDATE" => {} From ec2030794d8c9f7c85a9dfdbef725fa9f5662094 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 16:08:12 +0200 Subject: [PATCH 05/34] Add gateway Ready event --- src/api/types.rs | 18 ++++++++++++++++++ src/gateway.rs | 3 +++ 2 files changed, 21 insertions(+) diff --git a/src/api/types.rs b/src/api/types.rs index 2efbc1a..2bed6b5 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -134,6 +134,12 @@ pub struct Error { pub code: String, } +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct UnavailableGuild { + id: String, + unavailable: bool +} + #[derive(Serialize, Deserialize, Debug, Default)] pub struct UserObject { id: String, @@ -802,6 +808,18 @@ pub struct GatewayResume { impl WebSocketEvent for GatewayResume {} +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct GatewayReady { + pub v: u8, + pub user: UserObject, + pub guilds: Vec, + pub session_id: String, + pub resume_gateway_url: String, + pub shard: Option<(u64, u64)>, +} + +impl WebSocketEvent for GatewayReady {} + #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayHello { pub op: i32, diff --git a/src/gateway.rs b/src/gateway.rs index 012e764..83cf07c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -64,6 +64,9 @@ impl<'a> Gateway<'a> { // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { + "READY" => { + let data: GatewayReady = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + } "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} "AUTO_MODERATION_RULE_CREATE" => {} "AUTO_MODERATION_RULE_UPDATE" => {} From 23472d01d954595ba1cd35b5e6e1de4920dffd4f Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 18:37:52 +0200 Subject: [PATCH 06/34] Update integers on some types --- src/api/types.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 2bed6b5..40f7f8f 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -830,7 +830,7 @@ impl WebSocketEvent for GatewayHello {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct HelloData { - pub heartbeat_interval: i32, + pub heartbeat_interval: u128, } impl WebSocketEvent for HelloData {} @@ -838,7 +838,7 @@ impl WebSocketEvent for HelloData {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayHeartbeat { pub op: u8, - pub d: u64, + pub d: Option, } impl WebSocketEvent for GatewayHeartbeat {} @@ -852,9 +852,9 @@ impl WebSocketEvent for GatewayHeartbeatAck {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayPayload { - pub op: i32, + pub op: u8, pub d: Option, - pub s: Option, + pub s: Option, pub t: Option, } From 85d79bb304bed699e5d771f8f53010015e0a5174 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 18:38:04 +0200 Subject: [PATCH 07/34] Experimental heartbeats --- src/gateway.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 83cf07c..94beeb1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -17,9 +17,12 @@ use serde::Serialize; use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; +use tokio::time; +use tokio::time::Instant; use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; @@ -35,6 +38,7 @@ pub struct Gateway<'a> { pub token: String, pub events: Events<'a>, websocket: WebSocketConnection, + heartbeat_handler: Option } impl<'a> Gateway<'a> { @@ -47,6 +51,7 @@ impl<'a> Gateway<'a> { token, events: Events::default(), websocket: WebSocketConnection::new(websocket_url).await, + heartbeat_handler: None, }); } @@ -167,19 +172,94 @@ impl<'a> Gateway<'a> { // Invalid Session 9 => {todo!()} // Hello - // Should start our heartbeat + // Starts our heartbeat 10 => { let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK 11 => {} 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } + + // If we have an active heartbeat thread and we received a seq number we should let it know + if gateway_payload.s.is_some() { + if self.heartbeat_handler.is_some() { + + let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() }; + + self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap(); + } + } } } } +/** +Handles sending heartbeats to the gateway in another thread +*/ +struct HeartbeatHandler { + /// The heartbeat interval in milliseconds + heartbeat_interval: u128, + tx: Sender, +} + +impl HeartbeatHandler { + pub fn new(heartbeat_interval: u128, websocket_tx: Arc>>) -> HeartbeatHandler { + let (mut tx, mut rx) = mpsc::channel(32); + + task::spawn(async move { + let mut last_heartbeat: Instant = time::Instant::now(); + let mut last_seq_number: Option = None; + + loop { + + // If we received a seq number update, use that as the last seq number + let hb_communication: Option = rx.recv().await; + while hb_communication.is_some() { + last_seq_number = Some(hb_communication.unwrap().d); + } + + if last_heartbeat.elapsed().as_millis() > heartbeat_interval { + + let heartbeat = GatewayHeartbeat { + op: 1, + d: last_seq_number + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); + + websocket_tx.lock() + .await + .send(msg) + .await + .unwrap(); + + last_heartbeat = time::Instant::now(); + } + + } + }); + + Self { heartbeat_interval, tx } + } +} + +/** +Used to communicate with the main thread. +Either signifies a sequence number update or a received heartbeat ack +*/ +#[derive(Clone, Copy, Debug)] +struct HeartbeatThreadCommunication { + /// An opcode for the communication we received + op: u8, + /// The sequence number we got from discord + d: u64 +} + struct WebSocketConnection { rx: Arc>>, tx: Arc>>, From f6c9d5a80775f95e8186e4a1163b48a82005bd37 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 19:23:57 +0200 Subject: [PATCH 08/34] Experimental sending to gateway --- src/gateway.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index 94beeb1..6241e2a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -194,6 +194,42 @@ impl<'a> Gateway<'a> { } } } + + /// Sends json to the gateway with an opcode + async fn send_json_event(&self, op: u8, to_send: String) { + + let gateway_payload: GatewayPayload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; + + let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); + + self.websocket.tx.lock().await.send(message).await.unwrap(); + } + + /// Sends an identify event to the gateway + pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(2, to_send_json).await; + } + + /// Sends a resume event to the gateway + pub async fn send_resume(&self, to_send: GatewayResume) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(6, to_send_json).await; + } + + /// Sends an update presence event to the gateway + pub async fn send_update_presence(&self, to_send: PresenceUpdate) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(3, to_send_json).await; + } } /** From 6a99129fe770dd271e3e353f2ff2bdbe4c82d903 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 19:42:31 +0200 Subject: [PATCH 09/34] Add temp debug, remove unused token --- src/gateway.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 6241e2a..df9f230 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -35,7 +35,6 @@ implemented [Types] with the trait [`WebSocketEvent`] */ pub struct Gateway<'a> { pub url: String, - pub token: String, pub events: Events<'a>, websocket: WebSocketConnection, heartbeat_handler: Option @@ -44,11 +43,9 @@ pub struct Gateway<'a> { impl<'a> Gateway<'a> { pub async fn new( websocket_url: String, - token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { return Ok(Gateway { url: websocket_url.clone(), - token, events: Events::default(), websocket: WebSocketConnection::new(websocket_url).await, heartbeat_handler: None, @@ -58,6 +55,7 @@ impl<'a> Gateway<'a> { /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers pub async fn update_events(&mut self) { while let Some(msg) = self.websocket.rx.lock().await.recv().await { + println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes @@ -202,6 +200,8 @@ impl<'a> Gateway<'a> { let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + println!("Debug GW: Sending WSE: {}", payload_json.clone()); + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); self.websocket.tx.lock().await.send(message).await.unwrap(); @@ -513,7 +513,7 @@ mod example { #[tokio::test] async fn test_gateway() { - let gateway = Gateway::new("ws://localhost:3001/".to_string(), "none".to_string()) + let gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); } From 47c38c55415a5f4b57bfb12913d856472249cce9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:26:51 +0200 Subject: [PATCH 10/34] Give websocket a send and receive channel --- src/gateway.rs | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index df9f230..08e6ad5 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -310,15 +310,25 @@ impl<'a> WebSocketConnection { )); }*/ - let (mut channel_write, mut channel_read): ( + let (mut send_channel_write, mut send_channel_read): ( Sender, Receiver, ) = channel(32); - let shared_channel_write = Arc::new(Mutex::new(channel_write)); - let clone_shared_channel_write = shared_channel_write.clone(); - let shared_channel_read = Arc::new(Mutex::new(channel_read)); - let clone_shared_channel_read = shared_channel_read.clone(); + let (mut receive_channel_write, mut receive_channel_read): ( + Sender, + Receiver, + ) = channel(32); + + let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); + let shared_send_channel_read = Arc::new(Mutex::new(send_channel_read)); + + let clone_shared_send_channel_write = shared_send_channel_write.clone(); + + let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); + let shared_receive_channel_write = Arc::new(Mutex::new(receive_channel_write)); + + let clone_shared_receive_channel_read = shared_receive_channel_read.clone(); task::spawn(async move { let (mut ws_stream, _) = match connect_async_tls_with_config( @@ -336,14 +346,14 @@ impl<'a> WebSocketConnection { ))*/ }; - let (mut write_tx, mut write_rx) = ws_stream.split(); + let (mut ws_tx, mut ws_rx) = ws_stream.split(); - while let Some(msg) = shared_channel_read.lock().await.recv().await { - write_tx.send(msg).await.unwrap(); + while let Some(msg) = shared_send_channel_read.lock().await.recv().await { + ws_tx.send(msg).await.unwrap(); } - let event = while let Some(msg) = write_rx.next().await { - shared_channel_write + let event = while let Some(msg) = ws_rx.next().await { + shared_receive_channel_write .lock() .await .send(msg.unwrap()) @@ -353,8 +363,8 @@ impl<'a> WebSocketConnection { }); WebSocketConnection { - tx: clone_shared_channel_write, - rx: clone_shared_channel_read, + tx: clone_shared_send_channel_write, + rx: clone_shared_receive_channel_read, } } } @@ -513,8 +523,18 @@ mod example { #[tokio::test] async fn test_gateway() { - let gateway = Gateway::new("ws://localhost:3001/".to_string()) + let mut gateway = Gateway::new("wss://localhost:3001/".to_string()) .await .unwrap(); + + let mut test_indetify = GatewayIdentifyPayload::default(); + test_indetify.intents = 1000; + test_indetify.token = "haaaaeeuuuggghh".to_string(); + test_indetify.properties = GatewayIdentifyConnectionProps { os: "Linux".to_string(), browser: "Very Poggers Discord Client".to_string(), device: "Some device idfk".to_string()}; + gateway.send_identify(test_indetify).await; + + loop { + gateway.update_events().await; + } } } From ab4c20562c665ccdb66570430f53e0221b2bf9f8 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:28:47 +0200 Subject: [PATCH 11/34] Remove the dumb test stuff I left in --- src/gateway.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 08e6ad5..9c961ed 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -523,18 +523,8 @@ mod example { #[tokio::test] async fn test_gateway() { - let mut gateway = Gateway::new("wss://localhost:3001/".to_string()) + let mut gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); - - let mut test_indetify = GatewayIdentifyPayload::default(); - test_indetify.intents = 1000; - test_indetify.token = "haaaaeeuuuggghh".to_string(); - test_indetify.properties = GatewayIdentifyConnectionProps { os: "Linux".to_string(), browser: "Very Poggers Discord Client".to_string(), device: "Some device idfk".to_string()}; - gateway.send_identify(test_indetify).await; - - loop { - gateway.update_events().await; - } } } From 7ac4da8482128d549a8d71d9fe73e9bb430b14b6 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:58:00 +0200 Subject: [PATCH 12/34] Fix encoding wrong --- src/api/types.rs | 2 +- src/gateway.rs | 40 ++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 40f7f8f..16cccda 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -853,7 +853,7 @@ impl WebSocketEvent for GatewayHeartbeatAck {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayPayload { pub op: u8, - pub d: Option, + pub d: Option, pub s: Option, pub t: Option, } diff --git a/src/gateway.rs b/src/gateway.rs index 9c961ed..faba607 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -68,7 +68,7 @@ impl<'a> Gateway<'a> { // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { "READY" => { - let data: GatewayReady = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} "AUTO_MODERATION_RULE_CREATE" => {} @@ -113,46 +113,46 @@ impl<'a> Gateway<'a> { "INVITE_CREATE" => {} "INVITE_DELETE" => {} "MESSAGE_CREATE" => { - let new_data: MessageCreate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.create.update_data(new_data); } "MESSAGE_UPDATE" => { - let new_data: MessageUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.update.update_data(new_data); } "MESSAGE_DELETE" => { - let new_data: MessageDelete = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.delete.update_data(new_data); } "MESSAGE_DELETE_BULK" => { - let new_data: MessageDeleteBulk = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.delete_bulk.update_data(new_data); } "MESSAGE_REACTION_ADD" => { - let new_data: MessageReactionAdd = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_add.update_data(new_data); } "MESSAGE_REACTION_REMOVE" => { - let new_data: MessageReactionRemove = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove.update_data(new_data); } "MESSAGE_REACTION_REMOVE_ALL" => { - let new_data: MessageReactionRemoveAll = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove_all.update_data(new_data); } "MESSAGE_REACTION_REMOVE_EMOJI" => { - let new_data: MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove_emoji.update_data(new_data); } "PRESENCE_UPDATE" => { - let new_data: PresenceUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.presence_update.update_data(new_data); } "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} "TYPING_START" => { - let new_data: TypingStartEvent = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.typing_start_event.update_data(new_data); } "USER_UPDATE" => {} @@ -172,7 +172,7 @@ impl<'a> Gateway<'a> { // Hello // Starts our heartbeat 10 => { - let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK @@ -194,9 +194,9 @@ impl<'a> Gateway<'a> { } /// Sends json to the gateway with an opcode - async fn send_json_event(&self, op: u8, to_send: String) { + async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { - let gateway_payload: GatewayPayload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; + let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; let payload_json = serde_json::to_string(&gateway_payload).unwrap(); @@ -210,25 +210,25 @@ impl<'a> Gateway<'a> { /// Sends an identify event to the gateway pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(2, to_send_json).await; + self.send_json_event(2, to_send_value).await; } /// Sends a resume event to the gateway pub async fn send_resume(&self, to_send: GatewayResume) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(6, to_send_json).await; + self.send_json_event(6, to_send_value).await; } /// Sends an update presence event to the gateway pub async fn send_update_presence(&self, to_send: PresenceUpdate) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(3, to_send_json).await; + self.send_json_event(3, to_send_value).await; } } From 22bfe1be074c0611489b03849175aa91992cae10 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 22:04:57 +0200 Subject: [PATCH 13/34] Readd gateway to instance --- src/api/auth/login.rs | 2 +- src/api/auth/register.rs | 2 +- src/api/policies/instance/instance.rs | 2 +- src/instance.rs | 11 ++++++----- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/api/auth/login.rs b/src/api/auth/login.rs index f7c4fe6..b419279 100644 --- a/src/api/auth/login.rs +++ b/src/api/auth/login.rs @@ -8,7 +8,7 @@ pub mod login { use crate::errors::InstanceServerError; use crate::instance::Instance; - impl Instance { + impl Instance<'_> { pub async fn login_account( &mut self, login_schema: &LoginSchema, diff --git a/src/api/auth/register.rs b/src/api/auth/register.rs index b932b9f..df7d8dc 100644 --- a/src/api/auth/register.rs +++ b/src/api/auth/register.rs @@ -8,7 +8,7 @@ pub mod register { instance::{Instance, Token}, }; - impl Instance { + impl Instance<'_> { /** Registers a new user on the Spacebar server. # Arguments diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 7c1a48a..1c53b27 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -5,7 +5,7 @@ pub mod instance { use crate::errors::InstanceServerError; use crate::{api::types::InstancePolicies, instance::Instance}; - impl Instance { + impl Instance<'_> { /** Gets the instance policies schema. # Errors diff --git a/src/instance.rs b/src/instance.rs index 3e52e88..b1cb153 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,26 +1,26 @@ use crate::api::limits::Limits; use crate::api::types::{InstancePolicies, User}; use crate::errors::{FieldFormatError, InstanceServerError}; +use crate::gateway::Gateway; use crate::limit::LimitedRequester; use crate::URLBundle; use std::collections::HashMap; use std::fmt; -#[derive(Debug)] /** The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server. */ -pub struct Instance { +pub struct Instance<'a> { pub urls: URLBundle, pub instance_info: InstancePolicies, pub requester: LimitedRequester, pub limits: Limits, - //pub gateway: Gateway, + pub gateway: Gateway<'a>, pub users: HashMap, } -impl Instance { +impl Instance<'_> { /// Creates a new [`Instance`]. /// # Arguments /// * `urls` - The [`URLBundle`] that contains all the URLs that are needed to connect to the Spacebar server. @@ -30,7 +30,7 @@ impl Instance { pub async fn new( urls: URLBundle, requester: LimitedRequester, - ) -> Result { + ) -> Result, InstanceServerError> { let users: HashMap = HashMap::new(); let mut instance = Instance { urls: urls.clone(), @@ -48,6 +48,7 @@ impl Instance { limits: Limits::check_limits(urls.api).await, requester, users, + gateway: Gateway::new(urls.wss.clone()).await.unwrap(), }; instance.instance_info = match instance.instance_policies_schema().await { Ok(schema) => schema, From a4fdf181064e92f075f305ce0afc73870e64d6cb Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 22:46:00 +0200 Subject: [PATCH 14/34] Fixed instance lifetime parameter --- src/api/auth/login.rs | 2 +- src/api/auth/register.rs | 2 +- src/api/policies/instance/instance.rs | 2 +- src/instance.rs | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api/auth/login.rs b/src/api/auth/login.rs index b419279..23add11 100644 --- a/src/api/auth/login.rs +++ b/src/api/auth/login.rs @@ -8,7 +8,7 @@ pub mod login { use crate::errors::InstanceServerError; use crate::instance::Instance; - impl Instance<'_> { + impl<'a> Instance<'a> { pub async fn login_account( &mut self, login_schema: &LoginSchema, diff --git a/src/api/auth/register.rs b/src/api/auth/register.rs index df7d8dc..b4d4fd1 100644 --- a/src/api/auth/register.rs +++ b/src/api/auth/register.rs @@ -8,7 +8,7 @@ pub mod register { instance::{Instance, Token}, }; - impl Instance<'_> { + impl<'a> Instance<'a> { /** Registers a new user on the Spacebar server. # Arguments diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 1c53b27..f7a5653 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -5,7 +5,7 @@ pub mod instance { use crate::errors::InstanceServerError; use crate::{api::types::InstancePolicies, instance::Instance}; - impl Instance<'_> { + impl<'a> Instance<'a> { /** Gets the instance policies schema. # Errors diff --git a/src/instance.rs b/src/instance.rs index b1cb153..5ebcbaa 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -20,7 +20,7 @@ pub struct Instance<'a> { pub users: HashMap, } -impl Instance<'_> { +impl<'a> Instance<'a> { /// Creates a new [`Instance`]. /// # Arguments /// * `urls` - The [`URLBundle`] that contains all the URLs that are needed to connect to the Spacebar server. @@ -30,7 +30,7 @@ impl Instance<'_> { pub async fn new( urls: URLBundle, requester: LimitedRequester, - ) -> Result, InstanceServerError> { + ) -> Result, InstanceServerError> { let users: HashMap = HashMap::new(); let mut instance = Instance { urls: urls.clone(), From 7dbdcf828de6a35df1517d180ee77bc3c4b47dec Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 08:39:41 +0200 Subject: [PATCH 15/34] Slight code cleanup --- src/gateway.rs | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index faba607..a1a8d7c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -321,14 +321,7 @@ impl<'a> WebSocketConnection { ) = channel(32); let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); - let shared_send_channel_read = Arc::new(Mutex::new(send_channel_read)); - - let clone_shared_send_channel_write = shared_send_channel_write.clone(); - let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); - let shared_receive_channel_write = Arc::new(Mutex::new(receive_channel_write)); - - let clone_shared_receive_channel_read = shared_receive_channel_read.clone(); task::spawn(async move { let (mut ws_stream, _) = match connect_async_tls_with_config( @@ -348,14 +341,14 @@ impl<'a> WebSocketConnection { let (mut ws_tx, mut ws_rx) = ws_stream.split(); - while let Some(msg) = shared_send_channel_read.lock().await.recv().await { + // Send messages from the send channel + while let Some(msg) = send_channel_read.recv().await { ws_tx.send(msg).await.unwrap(); } - let event = while let Some(msg) = ws_rx.next().await { - shared_receive_channel_write - .lock() - .await + // Write received messages to the receive channel + while let Some(msg) = ws_rx.next().await { + receive_channel_write .send(msg.unwrap()) .await .unwrap(); @@ -363,8 +356,8 @@ impl<'a> WebSocketConnection { }); WebSocketConnection { - tx: clone_shared_send_channel_write, - rx: clone_shared_receive_channel_read, + tx: shared_send_channel_write, + rx: shared_receive_channel_read, } } } @@ -526,5 +519,9 @@ mod example { let mut gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); + gateway.send_identify(GatewayIdentifyPayload::default()).await; + loop { + gateway.update_events().await; + } } } From 6fdff9772262624b0fe8dfce62a952415280ab5c Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 10:39:16 +0200 Subject: [PATCH 16/34] Update types to fix deserialization errors --- src/api/types.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 16cccda..cf0e0c4 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -151,10 +151,10 @@ pub struct UserObject { mfa_enabled: Option, banner: Option, accent_color: Option, - locale: String, + locale: Option, verified: Option, email: Option, - flags: i8, + flags: String, // Not sure why flags is a string, but real responses from the gateway give this is an integer in string format premium_type: Option, public_flags: Option, } @@ -814,7 +814,7 @@ pub struct GatewayReady { pub user: UserObject, pub guilds: Vec, pub session_id: String, - pub resume_gateway_url: String, + pub resume_gateway_url: Option, pub shard: Option<(u64, u64)>, } @@ -858,4 +858,4 @@ pub struct GatewayPayload { pub t: Option, } -impl WebSocketEvent for GatewayPayload {} +impl WebSocketEvent for GatewayPayload {} \ No newline at end of file From 172c00081306c935fc1265726576822ef7716589 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 10:39:58 +0200 Subject: [PATCH 17/34] Update Websocket to fix premature closing w 1006 --- src/gateway.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index a1a8d7c..dbf52b2 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -341,18 +341,25 @@ impl<'a> WebSocketConnection { let (mut ws_tx, mut ws_rx) = ws_stream.split(); - // Send messages from the send channel - while let Some(msg) = send_channel_read.recv().await { - ws_tx.send(msg).await.unwrap(); - } + loop { - // Write received messages to the receive channel - while let Some(msg) = ws_rx.next().await { - receive_channel_write - .send(msg.unwrap()) - .await - .unwrap(); - }; + // Write received messages to the receive channel + let msg = ws_rx.next().await; + if msg.as_ref().is_some() { + let msg_unwrapped = msg.unwrap().unwrap(); + receive_channel_write + .send(msg_unwrapped) + .await + .unwrap(); + }; + + // Send messages from the send channel + let msg = send_channel_read.recv().await; + if msg.as_ref().is_some() { + let msg_unwrapped = msg.unwrap(); + ws_tx.send(msg_unwrapped).await.unwrap(); + } + } }); WebSocketConnection { From 7d3077eec4c6ddf7bcb0a83c42a3ba49f1bfa1f9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 11:13:07 +0200 Subject: [PATCH 18/34] Pub mod everything so it can be used as a library --- src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00170d0..e7699e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ -mod api; -mod errors; -mod gateway; -mod instance; -mod limit; -mod voice; +pub mod api; +pub mod errors; +pub mod gateway; +pub mod instance; +pub mod limit; +pub mod voice; use url::{ParseError, Url}; #[derive(Clone, Default, Debug, PartialEq, Eq)] From 9f187b88642038f78325e2f4b8be6dc0ddd825f8 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 11:14:38 +0200 Subject: [PATCH 19/34] Add a debug to heartbeat thread --- src/gateway.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index dbf52b2..756f7f7 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -259,6 +259,8 @@ impl HeartbeatHandler { if last_heartbeat.elapsed().as_millis() > heartbeat_interval { + println!("Debug GW: Sending Heartbeat"); + let heartbeat = GatewayHeartbeat { op: 1, d: last_seq_number From fc93762cf7d1bfe296a6941e39146996afb07c8e Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 11:47:12 +0200 Subject: [PATCH 20/34] Fix deserialization error --- src/api/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/types.rs b/src/api/types.rs index cf0e0c4..456be43 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -740,7 +740,7 @@ pub struct PresenceUpdate { since: Option, activities: Vec, status: String, - afk: bool, + afk: Option, } impl WebSocketEvent for PresenceUpdate {} From b4888d2f702b83d34b9b6a1adf8ac53b5d0cd561 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 11:58:12 +0200 Subject: [PATCH 21/34] Rethink websockets, fix thread blocks --- src/gateway.rs | 92 +++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 57 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 756f7f7..6f65f57 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,32 +1,24 @@ use std::sync::Arc; -use std::thread; use crate::api::types::*; use crate::api::WebSocketEvent; use crate::errors::ObserverError; use crate::gateway::events::Events; -use crate::URLBundle; -use futures_util::stream::{FilterMap, SplitSink, SplitStream}; use futures_util::SinkExt; use futures_util::StreamExt; +use futures_util::stream::SplitSink; use native_tls::TlsConnector; -use reqwest::Url; -use serde::Deserialize; -use serde::Serialize; -use serde_json::from_str; -use tokio::io; use tokio::net::TcpStream; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; use tokio::time; use tokio::time::Instant; -use tokio_tungstenite::tungstenite::error::UrlError; -use tokio_tungstenite::WebSocketStream; -use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; -use tokio_tungstenite::{Connector, MaybeTlsStream}; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::{WebSocketStream, Connector, connect_async_tls_with_config}; /** Represents a Gateway connection. A Gateway connection will create observable @@ -54,7 +46,13 @@ impl<'a> Gateway<'a> { /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers pub async fn update_events(&mut self) { - while let Some(msg) = self.websocket.rx.lock().await.recv().await { + + while let Ok(msg) = self.websocket.rx.lock().await.try_recv() { + + if msg.to_string() == String::new() { + continue; + } + println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); @@ -151,6 +149,8 @@ impl<'a> Gateway<'a> { "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} + // Not documented in discord docs, I assume this isnt for bots / apps but is for users? + "SESSIONS_REPLACE" => {} "TYPING_START" => { let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.typing_start_event.update_data(new_data); @@ -242,8 +242,8 @@ struct HeartbeatHandler { } impl HeartbeatHandler { - pub fn new(heartbeat_interval: u128, websocket_tx: Arc>>) -> HeartbeatHandler { - let (mut tx, mut rx) = mpsc::channel(32); + pub fn new(heartbeat_interval: u128, websocket_tx: Arc>, tokio_tungstenite::tungstenite::Message>>>) -> HeartbeatHandler { + let (tx, mut rx) = mpsc::channel(32); task::spawn(async move { let mut last_heartbeat: Instant = time::Instant::now(); @@ -252,8 +252,8 @@ impl HeartbeatHandler { loop { // If we received a seq number update, use that as the last seq number - let hb_communication: Option = rx.recv().await; - while hb_communication.is_some() { + let hb_communication: Result = rx.try_recv(); + if hb_communication.is_ok() { last_seq_number = Some(hb_communication.unwrap().d); } @@ -278,7 +278,6 @@ impl HeartbeatHandler { last_heartbeat = time::Instant::now(); } - } }); @@ -300,49 +299,35 @@ struct HeartbeatThreadCommunication { struct WebSocketConnection { rx: Arc>>, - tx: Arc>>, + tx: Arc>, tokio_tungstenite::tungstenite::Message>>>, } impl<'a> WebSocketConnection { async fn new(websocket_url: String) -> WebSocketConnection { - let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); - /*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(tokio_tungstenite::tungstenite::Error::Url( - UrlError::UnsupportedUrlScheme, - )); - }*/ - let (mut send_channel_write, mut send_channel_read): ( + let (receive_channel_write, receive_channel_read): ( Sender, Receiver, ) = channel(32); - let (mut receive_channel_write, mut receive_channel_read): ( - Sender, - Receiver, - ) = channel(32); - - let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); + let (ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(e) => panic!("{:?}", e), + }; + + let (ws_tx, mut ws_rx) = ws_stream.split(); + task::spawn(async move { - let (mut ws_stream, _) = match connect_async_tls_with_config( - &websocket_url, - None, - Some(Connector::NativeTls( - TlsConnector::builder().build().unwrap(), - )), - ) - .await - { - Ok(ws_stream) => ws_stream, - Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io( - io::ErrorKind::ConnectionAborted.into(), - ))*/ - }; - - let (mut ws_tx, mut ws_rx) = ws_stream.split(); - loop { // Write received messages to the receive channel @@ -354,18 +339,11 @@ impl<'a> WebSocketConnection { .await .unwrap(); }; - - // Send messages from the send channel - let msg = send_channel_read.recv().await; - if msg.as_ref().is_some() { - let msg_unwrapped = msg.unwrap(); - ws_tx.send(msg_unwrapped).await.unwrap(); - } } }); WebSocketConnection { - tx: shared_send_channel_write, + tx: Arc::new(Mutex::new(ws_tx)), rx: shared_receive_channel_read, } } From 0fd7be51a84f2bcf3789e0a780e4afb730971818 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:04:46 +0200 Subject: [PATCH 22/34] Fix merge conflict --- src/api/policies/instance/instance.rs | 62 +++++++++++++-------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index f7a5653..4f86136 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -1,41 +1,37 @@ -pub mod instance { - use reqwest::Client; - use serde_json::from_str; +use reqwest::Client; +use serde_json::from_str; - use crate::errors::InstanceServerError; - use crate::{api::types::InstancePolicies, instance::Instance}; +use crate::errors::InstanceServerError; +use crate::{api::types::InstancePolicies, instance::Instance}; - impl<'a> Instance<'a> { - /** - Gets the instance policies schema. - # Errors - [`InstanceServerError`] - If the request fails. - */ - pub async fn instance_policies_schema( - &self, - ) -> Result { - let client = Client::new(); - let endpoint_url = self.urls.get_api().to_string() + "/policies/instance/"; - let request = match client.get(&endpoint_url).send().await { - Ok(result) => result, - Err(e) => { - return Err(InstanceServerError::RequestErrorError { - url: endpoint_url, - error: e.to_string(), - }); - } - }; - - if !request.status().as_str().starts_with('2') { - return Err(InstanceServerError::ReceivedErrorCodeError { - error_code: request.status().to_string(), +impl<'a> Instance<'a> { + /** + Gets the instance policies schema. + # Errors + [`InstanceServerError`] - If the request fails. + */ + pub async fn instance_policies_schema(&self) -> Result { + let client = Client::new(); + let endpoint_url = self.urls.get_api().to_string() + "/policies/instance/"; + let request = match client.get(&endpoint_url).send().await { + Ok(result) => result, + Err(e) => { + return Err(InstanceServerError::RequestErrorError { + url: endpoint_url, + error: e.to_string(), }); } + }; - let body = request.text().await.unwrap(); - let instance_policies_schema: InstancePolicies = from_str(&body).unwrap(); - Ok(instance_policies_schema) + if !request.status().as_str().starts_with('2') { + return Err(InstanceServerError::ReceivedErrorCodeError { + error_code: request.status().to_string(), + }); } + + let body = request.text().await.unwrap(); + let instance_policies_schema: InstancePolicies = from_str(&body).unwrap(); + Ok(instance_policies_schema) } } @@ -58,4 +54,4 @@ mod instance_policies_schema_test { let schema = test_instance.instance_policies_schema().await.unwrap(); println!("{:?}", schema); } -} +} \ No newline at end of file From e67acfab49c001ec1692359704365e425da8ab0c Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:05:58 +0200 Subject: [PATCH 23/34] Was still a merge conflict there --- src/api/policies/instance/instance.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 4f86136..34af1c5 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -1,3 +1,4 @@ + use reqwest::Client; use serde_json::from_str; From fa05680552321e8e3bf3ecc8c955982d7f7debcd Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:44:11 +0200 Subject: [PATCH 24/34] Less spam debug log --- src/gateway.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 6f65f57..8220d43 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -53,7 +53,6 @@ impl<'a> Gateway<'a> { continue; } - println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes @@ -63,6 +62,8 @@ impl<'a> Gateway<'a> { 0 => { let gateway_payload_t = gateway_payload.t.unwrap(); + println!("GW: Received {}..", gateway_payload_t); + // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { "READY" => { @@ -172,11 +173,14 @@ impl<'a> Gateway<'a> { // Hello // Starts our heartbeat 10 => { + println!("GW: Received Hello"); let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK - 11 => {} + 11 => { + println!("GW: Received Heartbeat ACK"); + } 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } @@ -200,8 +204,6 @@ impl<'a> Gateway<'a> { let payload_json = serde_json::to_string(&gateway_payload).unwrap(); - println!("Debug GW: Sending WSE: {}", payload_json.clone()); - let message = tokio_tungstenite::tungstenite::Message::text(payload_json); self.websocket.tx.lock().await.send(message).await.unwrap(); @@ -212,6 +214,8 @@ impl<'a> Gateway<'a> { let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("GW: Sending Identify.."); + self.send_json_event(2, to_send_value).await; } @@ -220,6 +224,8 @@ impl<'a> Gateway<'a> { let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("GW: Sending Resume.."); + self.send_json_event(6, to_send_value).await; } @@ -259,7 +265,7 @@ impl HeartbeatHandler { if last_heartbeat.elapsed().as_millis() > heartbeat_interval { - println!("Debug GW: Sending Heartbeat"); + println!("GW: Sending Heartbeat.."); let heartbeat = GatewayHeartbeat { op: 1, From 6dfaa6e91fdaca41407aa6eff6e7c29054d4cb28 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Thu, 11 May 2023 22:47:31 +0200 Subject: [PATCH 25/34] Attempted reimpl --- src/api/auth/login.rs | 2 +- src/api/auth/register.rs | 2 +- src/gateway.rs | 475 ++++++++++++++++++++++----------------- src/instance.rs | 5 +- 4 files changed, 270 insertions(+), 214 deletions(-) diff --git a/src/api/auth/login.rs b/src/api/auth/login.rs index 2f77158..67647f7 100644 --- a/src/api/auth/login.rs +++ b/src/api/auth/login.rs @@ -8,7 +8,7 @@ pub mod login { use crate::errors::InstanceServerError; use crate::instance::Instance; - impl<'a> Instance<'a> { + impl Instance { pub async fn login_account( &mut self, login_schema: &LoginSchema, diff --git a/src/api/auth/register.rs b/src/api/auth/register.rs index 4a3c71a..186be78 100644 --- a/src/api/auth/register.rs +++ b/src/api/auth/register.rs @@ -8,7 +8,7 @@ pub mod register { instance::Instance, }; - impl<'a> Instance<'a> { + impl Instance { /** Registers a new user on the Spacebar server. # Arguments diff --git a/src/gateway.rs b/src/gateway.rs index 7f31fe2..b84e9b9 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -18,200 +18,51 @@ use tokio::time::Instant; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::{WebSocketStream, Connector, connect_async_tls_with_config}; +#[derive(Debug)] /** -Represents a Gateway connection. A Gateway connection will create observable +Represents a handle to a Gateway connection. A Gateway connection will create observable [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently implemented [Types] with the trait [`WebSocketEvent`] +Using this handle you can also send Gateway Events directly. */ -pub struct Gateway<'a> { +pub struct GatewayHandle { pub url: String, - pub events: Events<'a>, - websocket: WebSocketConnection, - heartbeat_handler: Option + pub events: Arc>, + pub websocket_tx: Arc>, tokio_tungstenite::tungstenite::Message>>>, } -impl<'a> Gateway<'a> { - pub async fn new( - websocket_url: String, - ) -> Result, tokio_tungstenite::tungstenite::Error> { - return Ok(Gateway { - url: websocket_url.clone(), - events: Events::default(), - websocket: WebSocketConnection::new(websocket_url).await, - heartbeat_handler: None, - }); - } - - /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers - pub async fn update_events(&mut self) { - - while let Ok(msg) = self.websocket.rx.lock().await.try_recv() { - - if msg.to_string() == String::new() { - continue; - } - - let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); - - // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes - match gateway_payload.op { - // Dispatch - // An event was dispatched, we need to look at the gateway event name t - 0 => { - let gateway_payload_t = gateway_payload.t.unwrap(); - - println!("GW: Received {}..", gateway_payload_t); - - // See https://discord.com/developers/docs/topics/gateway-events#receive-events - match gateway_payload_t.as_str() { - "READY" => { - let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - } - "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} - "AUTO_MODERATION_RULE_CREATE" => {} - "AUTO_MODERATION_RULE_UPDATE" => {} - "AUTO_MODERATION_RULE_DELETE" => {} - "AUTO_MODERATION_ACTION_EXECUTION" => {} - "CHANNEL_CREATE" => {} - "CHANNEL_UPDATE" => {} - "CHANNEL_DELETE" => {} - "CHANNEL_PINS_UPDATE" => {} - "THREAD_CREATE" => {} - "THREAD_UPDATE" => {} - "THREAD_DELETE" => {} - "THREAD_LIST_SYNC" => {} - "THREAD_MEMBER_UPDATE" => {} - "THREAD_MEMBERS_UPDATE" => {} - "GUILD_CREATE" => {} - "GUILD_UPDATE" => {} - "GUILD_DELETE" => {} - "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} - "GUILD_BAN_ADD" => {} - "GUILD_BAN_REMOVE" => {} - "GUILD_EMOJIS_UPDATE" => {} - "GUILD_STICKERS_UPDATE" => {} - "GUILD_INTEGRATIONS_UPDATE" => {} - "GUILD_MEMBER_ADD" => {} - "GUILD_MEMBER_REMOVE" => {} - "GUILD_MEMBER_UPDATE" => {} - "GUILD_MEMBERS_CHUNK" => {} - "GUILD_ROLE_CREATE" => {} - "GUILD_ROLE_UPDATE" => {} - "GUILD_ROLE_DELETE" => {} - "GUILD_SCHEDULED_EVENT_CREATE" => {} - "GUILD_SCHEDULED_EVENT_UPDATE" => {} - "GUILD_SCHEDULED_EVENT_DELETE" => {} - "GUILD_SCHEDULED_EVENT_USER_ADD" => {} - "GUILD_SCHEDULED_EVENT_USER_REMOVE" => {} - "INTEGRATION_CREATE" => {} - "INTEGRATION_UPDATE" => {} - "INTEGRATION_DELETE" => {} - "INTERACTION_CREATE" => {} - "INVITE_CREATE" => {} - "INVITE_DELETE" => {} - "MESSAGE_CREATE" => { - let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.create.update_data(new_data); - } - "MESSAGE_UPDATE" => { - let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.update.update_data(new_data); - } - "MESSAGE_DELETE" => { - let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.delete.update_data(new_data); - } - "MESSAGE_DELETE_BULK" => { - let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.delete_bulk.update_data(new_data); - } - "MESSAGE_REACTION_ADD" => { - let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.reaction_add.update_data(new_data); - } - "MESSAGE_REACTION_REMOVE" => { - let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.reaction_remove.update_data(new_data); - } - "MESSAGE_REACTION_REMOVE_ALL" => { - let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.reaction_remove_all.update_data(new_data); - } - "MESSAGE_REACTION_REMOVE_EMOJI" => { - let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.message.reaction_remove_emoji.update_data(new_data); - } - "PRESENCE_UPDATE" => { - let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.user.presence_update.update_data(new_data); - } - "STAGE_INSTANCE_CREATE" => {} - "STAGE_INSTANCE_UPDATE" => {} - "STAGE_INSTANCE_DELETE" => {} - // Not documented in discord docs, I assume this isnt for bots / apps but is for users? - "SESSIONS_REPLACE" => {} - "TYPING_START" => { - let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.events.user.typing_start_event.update_data(new_data); - } - "USER_UPDATE" => {} - "VOICE_STATE_UPDATE" => {} - "VOICE_SERVER_UPDATE" => {} - "WEBHOOKS_UPDATE" => {} - _ => {panic!("Invalid gateway event ({})", &gateway_payload_t)} - } - } - // Heartbeat - // We received a heartbeat from the server - 1 => {} - // Reconnect - 7 => {todo!()} - // Invalid Session - 9 => {todo!()} - // Hello - // Starts our heartbeat - 10 => { - println!("GW: Received Hello"); - let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); - } - // Heartbeat ACK - 11 => { - println!("GW: Received Heartbeat ACK"); - } - 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} - _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} - } - - // If we have an active heartbeat thread and we received a seq number we should let it know - if gateway_payload.s.is_some() { - if self.heartbeat_handler.is_some() { - - let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() }; - - self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap(); - } - } - } - } - +impl GatewayHandle { /// Sends json to the gateway with an opcode async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { + println!("1"); + let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; + println!("2"); + let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + println!("3"); + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); - self.websocket.tx.lock().await.send(message).await.unwrap(); + println!("4"); + + self.websocket_tx.lock().await.send(message).await.unwrap(); + + println!("5"); } /// Sends an identify event to the gateway pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { + println!("0.1"); + let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("0.2"); + println!("GW: Sending Identify.."); self.send_json_event(2, to_send_value).await; @@ -236,6 +87,209 @@ impl<'a> Gateway<'a> { } } +pub struct Gateway { + pub events: Arc>, + heartbeat_handler: Option, + pub websocket_tx: Arc>, tokio_tungstenite::tungstenite::Message>>> +} + +impl<'a> Gateway { + pub async fn new( + websocket_url: String, + ) -> Result { + + let (ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(e) => panic!("{:?}", e), + }; + + let (ws_tx, mut ws_rx) = ws_stream.split(); + + let shared_tx = Arc::new(Mutex::new(ws_tx)); + + let mut gateway = Gateway { events: Arc::new(Mutex::new(Events::default())), heartbeat_handler: None, websocket_tx: shared_tx.clone() }; + + let shared_events = gateway.events.clone(); + + task::spawn(async move { + loop { + println!("Waiting for next event.."); + let msg = ws_rx.next().await; + println!("Received event or sth"); + if msg.as_ref().is_some() { + let msg_unwrapped = msg.unwrap().unwrap(); + gateway.handle_event(msg_unwrapped).await; + println!("Handled the event"); + }; + } + }); + + return Ok(GatewayHandle { + url: websocket_url.clone(), + events: shared_events, + websocket_tx: shared_tx.clone(), + }); + } + + /// This handles a message as a websocket event and updates its events along with the events' observers + pub async fn handle_event(&mut self, msg: tokio_tungstenite::tungstenite::Message) { + + if msg.to_string() == String::new() { + return; + } + + let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes + match gateway_payload.op { + // Dispatch + // An event was dispatched, we need to look at the gateway event name t + 0 => { + let gateway_payload_t = gateway_payload.t.unwrap(); + + println!("GW: Received {}..", gateway_payload_t); + + // See https://discord.com/developers/docs/topics/gateway-events#receive-events + match gateway_payload_t.as_str() { + "READY" => { + let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + } + "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} + "AUTO_MODERATION_RULE_CREATE" => {} + "AUTO_MODERATION_RULE_UPDATE" => {} + "AUTO_MODERATION_RULE_DELETE" => {} + "AUTO_MODERATION_ACTION_EXECUTION" => {} + "CHANNEL_CREATE" => {} + "CHANNEL_UPDATE" => {} + "CHANNEL_DELETE" => {} + "CHANNEL_PINS_UPDATE" => {} + "THREAD_CREATE" => {} + "THREAD_UPDATE" => {} + "THREAD_DELETE" => {} + "THREAD_LIST_SYNC" => {} + "THREAD_MEMBER_UPDATE" => {} + "THREAD_MEMBERS_UPDATE" => {} + "GUILD_CREATE" => {} + "GUILD_UPDATE" => {} + "GUILD_DELETE" => {} + "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} + "GUILD_BAN_ADD" => {} + "GUILD_BAN_REMOVE" => {} + "GUILD_EMOJIS_UPDATE" => {} + "GUILD_STICKERS_UPDATE" => {} + "GUILD_INTEGRATIONS_UPDATE" => {} + "GUILD_MEMBER_ADD" => {} + "GUILD_MEMBER_REMOVE" => {} + "GUILD_MEMBER_UPDATE" => {} + "GUILD_MEMBERS_CHUNK" => {} + "GUILD_ROLE_CREATE" => {} + "GUILD_ROLE_UPDATE" => {} + "GUILD_ROLE_DELETE" => {} + "GUILD_SCHEDULED_EVENT_CREATE" => {} + "GUILD_SCHEDULED_EVENT_UPDATE" => {} + "GUILD_SCHEDULED_EVENT_DELETE" => {} + "GUILD_SCHEDULED_EVENT_USER_ADD" => {} + "GUILD_SCHEDULED_EVENT_USER_REMOVE" => {} + "INTEGRATION_CREATE" => {} + "INTEGRATION_UPDATE" => {} + "INTEGRATION_DELETE" => {} + "INTERACTION_CREATE" => {} + "INVITE_CREATE" => {} + "INVITE_DELETE" => {} + "MESSAGE_CREATE" => { + let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.create.update_data(new_data).await; + } + "MESSAGE_UPDATE" => { + let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.update.update_data(new_data).await; + } + "MESSAGE_DELETE" => { + let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.delete.update_data(new_data).await; + } + "MESSAGE_DELETE_BULK" => { + let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.delete_bulk.update_data(new_data).await; + } + "MESSAGE_REACTION_ADD" => { + let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.reaction_add.update_data(new_data).await; + } + "MESSAGE_REACTION_REMOVE" => { + let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.reaction_remove.update_data(new_data).await; + } + "MESSAGE_REACTION_REMOVE_ALL" => { + let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.reaction_remove_all.update_data(new_data).await; + } + "MESSAGE_REACTION_REMOVE_EMOJI" => { + let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.message.reaction_remove_emoji.update_data(new_data).await; + } + "PRESENCE_UPDATE" => { + let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.user.presence_update.update_data(new_data).await; + } + "STAGE_INSTANCE_CREATE" => {} + "STAGE_INSTANCE_UPDATE" => {} + "STAGE_INSTANCE_DELETE" => {} + // Not documented in discord docs, I assume this isnt for bots / apps but is for users? + "SESSIONS_REPLACE" => {} + "TYPING_START" => { + let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.events.lock().await.user.typing_start_event.update_data(new_data).await; + } + "USER_UPDATE" => {} + "VOICE_STATE_UPDATE" => {} + "VOICE_SERVER_UPDATE" => {} + "WEBHOOKS_UPDATE" => {} + _ => {panic!("Invalid gateway event ({})", &gateway_payload_t)} + } + } + // Heartbeat + // We received a heartbeat from the server + 1 => {} + // Reconnect + 7 => {todo!()} + // Invalid Session + 9 => {todo!()} + // Hello + // Starts our heartbeat + 10 => { + println!("GW: Received Hello"); + let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket_tx.clone())); + } + // Heartbeat ACK + 11 => { + println!("GW: Received Heartbeat ACK"); + } + 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} + _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} + } + + // If we have an active heartbeat thread and we received a seq number we should let it know + if gateway_payload.s.is_some() { + if self.heartbeat_handler.is_some() { + + let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() }; + + self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap(); + } + } + } +} + /** Handles sending heartbeats to the gateway in another thread */ @@ -357,7 +411,7 @@ Trait which defines the behaviour of an Observer. An Observer is an object which an Observable. The Observer is notified when the Observable's data changes. In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. */ -pub trait Observer { +pub trait Observer: std::fmt::Debug { fn update(&self, data: &T); } @@ -365,14 +419,14 @@ pub trait Observer { change in the WebSocketEvent. GatewayEvents are observable. */ -#[derive(Default)] -pub struct GatewayEvent<'a, T: WebSocketEvent> { - observers: Vec<&'a dyn Observer>, +#[derive(Default, Debug)] +pub struct GatewayEvent { + observers: Vec + Sync + Send>>>, pub event_data: T, pub is_observed: bool, } -impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { +impl GatewayEvent { fn new(event_data: T) -> Self { Self { is_observed: false, @@ -395,7 +449,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { Returns an error if the GatewayEvent is already observed. Error type: [`ObserverError::AlreadySubscribedError`] */ - pub fn subscribe(&mut self, observable: &'a dyn Observer) -> Option { + pub fn subscribe(&mut self, observable: Arc + Sync + Send>>) -> Option { if self.is_observed { return Some(ObserverError::AlreadySubscribedError); } @@ -407,57 +461,59 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { /** Unsubscribes an Observer from the GatewayEvent. */ - pub fn unsubscribe(&mut self, observable: &'a dyn Observer) { + pub fn unsubscribe(&mut self, observable: Arc + Sync + Send>>) { // .retain()'s closure retains only those elements of the vector, which have a different // pointer value than observable. - self.observers.retain(|obs| !std::ptr::eq(*obs, observable)); + // The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same + // anddd there is no way to do that without using format + self.observers.retain(|obs| !(format!("{:?}", obs) == format!("{:?}", &observable))); self.is_observed = !self.observers.is_empty(); } /** Updates the GatewayEvent's data and notifies the observers. */ - fn update_data(&mut self, new_event_data: T) { + async fn update_data(&mut self, new_event_data: T) { self.event_data = new_event_data; - self.notify(); + self.notify().await; } /** Notifies the observers of the GatewayEvent. */ - fn notify(&self) { + async fn notify(&self) { for observer in &self.observers { - observer.update(&self.event_data); + observer.lock().await.update(&self.event_data); } } } mod events { use super::*; - #[derive(Default)] - pub struct Events<'a> { - pub message: Message<'a>, - pub user: User<'a>, - pub gateway_identify_payload: GatewayEvent<'a, GatewayIdentifyPayload>, - pub gateway_resume: GatewayEvent<'a, GatewayResume>, + #[derive(Default, Debug)] + pub struct Events { + pub message: Message, + pub user: User, + pub gateway_identify_payload: GatewayEvent, + pub gateway_resume: GatewayEvent, } - #[derive(Default)] - pub struct Message<'a> { - pub create: GatewayEvent<'a, MessageCreate>, - pub update: GatewayEvent<'a, MessageUpdate>, - pub delete: GatewayEvent<'a, MessageDelete>, - pub delete_bulk: GatewayEvent<'a, MessageDeleteBulk>, - pub reaction_add: GatewayEvent<'a, MessageReactionAdd>, - pub reaction_remove: GatewayEvent<'a, MessageReactionRemove>, - pub reaction_remove_all: GatewayEvent<'a, MessageReactionRemoveAll>, - pub reaction_remove_emoji: GatewayEvent<'a, MessageReactionRemoveEmoji>, + #[derive(Default, Debug)] + pub struct Message { + pub create: GatewayEvent, + pub update: GatewayEvent, + pub delete: GatewayEvent, + pub delete_bulk: GatewayEvent, + pub reaction_add: GatewayEvent, + pub reaction_remove: GatewayEvent, + pub reaction_remove_all: GatewayEvent, + pub reaction_remove_emoji: GatewayEvent, } - #[derive(Default)] - pub struct User<'a> { - pub presence_update: GatewayEvent<'a, PresenceUpdate>, - pub typing_start_event: GatewayEvent<'a, TypingStartEvent>, + #[derive(Default, Debug)] + pub struct User { + pub presence_update: GatewayEvent, + pub typing_start_event: GatewayEvent, } } @@ -466,6 +522,7 @@ mod example { use super::*; use crate::api::types::GatewayResume; + #[derive(Debug)] struct Consumer; impl Observer for Consumer { fn update(&self, data: &GatewayResume) { @@ -473,8 +530,8 @@ mod example { } } - #[test] - fn test_observer_behaviour() { + #[tokio::test] + async fn test_observer_behaviour() { let mut event = GatewayEvent::new(GatewayResume { token: "start".to_string(), session_id: "start".to_string(), @@ -489,15 +546,15 @@ mod example { let consumer = Consumer; - event.subscribe(&consumer); + event.subscribe(Arc::new(Mutex::new(consumer))); - event.notify(); + event.notify().await; - event.update_data(new_data); + event.update_data(new_data).await; let second_consumer = Consumer; - match event.subscribe(&second_consumer) { + match event.subscribe(Arc::new(Mutex::new(second_consumer))) { None => assert!(false), Some(err) => println!("You cannot subscribe twice: {}", err), } diff --git a/src/instance.rs b/src/instance.rs index ae8388a..bb54bfa 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,12 +1,13 @@ use crate::api::limits::Limits; use crate::api::types::{InstancePolicies}; use crate::errors::{FieldFormatError, InstanceServerError}; -use crate::gateway::Gateway; +use crate::gateway::{GatewayHandle, Gateway}; use crate::limit::LimitedRequester; use crate::URLBundle; use std::fmt; +#[derive(Debug)] /** The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server. @@ -16,7 +17,6 @@ pub struct Instance { pub instance_info: InstancePolicies, pub requester: LimitedRequester, pub limits: Limits, - pub gateway: Gateway, } impl Instance { @@ -45,7 +45,6 @@ impl Instance { ), limits: Limits::check_limits(urls.api).await, requester, - gateway: Gateway::new(urls.wss.clone()).await.unwrap(), }; instance.instance_info = match instance.instance_policies_schema().await { Ok(schema) => schema, From cef8fe9fdbd6da0a5a9965572479ca4642015630 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 09:05:20 +0200 Subject: [PATCH 26/34] Deserialization error.. --- src/api/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/types.rs b/src/api/types.rs index a182156..c91f53f 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -164,7 +164,7 @@ pub struct UserObject { premium: bool, purchased_flags: i32, premium_usage_flags: i32, - disabled: bool, + disabled: Option, } #[derive(Debug)] From 959bbac9bdbf0f5f48475bac46f8e9451c125c6b Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 09:47:12 +0200 Subject: [PATCH 27/34] Fix nested task issues --- src/gateway.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index b84e9b9..969101a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -108,7 +108,7 @@ impl<'a> Gateway { .await { Ok(ws_stream) => ws_stream, - Err(e) => panic!("{:?}", e), + Err(e) => return Err(e), }; let (ws_tx, mut ws_rx) = ws_stream.split(); @@ -119,15 +119,28 @@ impl<'a> Gateway { let shared_events = gateway.events.clone(); + // Wait for the first hello and then spawn both tasks so we avoid nested tasks + // This automatically spawns the heartbeat task, but from the main thread + let msg = ws_rx.next().await.unwrap().unwrap(); + let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + if gateway_payload.op != 10 { + println!("Recieved non hello on gateway init, what is happening?"); + return Err(tokio_tungstenite::tungstenite::Error::Protocol(tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode(gateway_payload.op))) + } + + println!("GW: Received Hello"); + + let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + gateway.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, shared_tx.clone())); + + // Now we can continously check for messages in a different task, since we aren't going to receive another hello task::spawn(async move { loop { - println!("Waiting for next event.."); let msg = ws_rx.next().await; - println!("Received event or sth"); if msg.as_ref().is_some() { let msg_unwrapped = msg.unwrap().unwrap(); gateway.handle_event(msg_unwrapped).await; - println!("Handled the event"); }; } }); @@ -265,10 +278,9 @@ impl<'a> Gateway { 9 => {todo!()} // Hello // Starts our heartbeat + // We should have already handled this in gateway init 10 => { - println!("GW: Received Hello"); - let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket_tx.clone())); + panic!("Recieved hello when it was unexpected"); } // Heartbeat ACK 11 => { @@ -328,8 +340,7 @@ impl HeartbeatHandler { let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); - websocket_tx.lock() - .await + websocket_tx.lock().await .send(msg) .await .unwrap(); From bcb97977f79343bef354e6699ef7d29517a28902 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 09:54:23 +0200 Subject: [PATCH 28/34] Remove terrible debug messages --- src/gateway.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 969101a..e0728e1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -35,34 +35,20 @@ impl GatewayHandle { /// Sends json to the gateway with an opcode async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { - println!("1"); - let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; - println!("2"); - let payload_json = serde_json::to_string(&gateway_payload).unwrap(); - println!("3"); - let message = tokio_tungstenite::tungstenite::Message::text(payload_json); - println!("4"); - self.websocket_tx.lock().await.send(message).await.unwrap(); - - println!("5"); } /// Sends an identify event to the gateway pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { - println!("0.1"); - let to_send_value = serde_json::to_value(&to_send).unwrap(); - println!("0.2"); - println!("GW: Sending Identify.."); self.send_json_event(2, to_send_value).await; From 595716afc55b204f31df41a15406d46722df0aee Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 15:36:29 +0200 Subject: [PATCH 29/34] Small unit test update --- src/gateway.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index e0728e1..1996ff1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -542,23 +542,33 @@ mod example { }; let consumer = Consumer; + let arc_mut_consumer = Arc::new(Mutex::new(consumer)); - event.subscribe(Arc::new(Mutex::new(consumer))); + event.subscribe(arc_mut_consumer.clone()); event.notify().await; event.update_data(new_data).await; let second_consumer = Consumer; + let arc_mut_second_consumer = Arc::new(Mutex::new(second_consumer)); - match event.subscribe(Arc::new(Mutex::new(second_consumer))) { + match event.subscribe(arc_mut_second_consumer.clone()) { None => assert!(false), Some(err) => println!("You cannot subscribe twice: {}", err), } + + event.unsubscribe(arc_mut_consumer.clone()); + + match event.subscribe(arc_mut_second_consumer.clone()) { + None => assert!(true), + Some(err) => assert!(false), + } + } #[tokio::test] - async fn test_gateway() { + async fn test_gateway_establish() { let _gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); From 11d371eee2993380ab542657baa1fe847c911aac Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 15:59:46 +0200 Subject: [PATCH 30/34] Add rest of send events --- src/api/types.rs | 24 ++++++++++++++++++++++++ src/gateway.rs | 22 ++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/api/types.rs b/src/api/types.rs index c91f53f..784cb73 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -803,6 +803,30 @@ pub struct GatewayReady { impl WebSocketEvent for GatewayReady {} +#[derive(Debug, Deserialize, Serialize, Default)] +/// See https://discord.com/developers/docs/topics/gateway-events#request-guild-members-request-guild-members-structure +pub struct GatewayRequestGuildMembers { + pub guild_id: String, + pub query: Option, + pub limit: u64, + pub presence: Option, + pub user_ids: Option, + pub nonce: Option, +} + +impl WebSocketEvent for GatewayRequestGuildMembers {} + +#[derive(Debug, Deserialize, Serialize, Default)] +/// See https://discord.com/developers/docs/topics/gateway-events#update-voice-state-gateway-voice-state-update-structure +pub struct GatewayVoiceStateUpdate { + pub guild_id: String, + pub channel_id: Option, + pub self_mute: bool, + pub self_deaf: bool, +} + +impl WebSocketEvent for GatewayVoiceStateUpdate {} + #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayHello { pub op: i32, diff --git a/src/gateway.rs b/src/gateway.rs index 1996ff1..964ac01 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -69,8 +69,30 @@ impl GatewayHandle { let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("GW: Sending Presence Update.."); + self.send_json_event(3, to_send_value).await; } + + /// Sends a Request Guild Members to the server + pub async fn send_request_guild_members(&self, to_send: GatewayRequestGuildMembers) { + + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + println!("GW: Sending Request Guild Members.."); + + self.send_json_event(8, to_send_value).await; + } + + /// Sends a Request Guild Members to the server + pub async fn send_update_voice_state(&self, to_send: GatewayVoiceStateUpdate) { + + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + println!("GW: Sending Voice State Update.."); + + self.send_json_event(4, to_send_value).await; + } } pub struct Gateway { From 41b75be2fb9fe65ef3c663ec1e87f2aa59514494 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 16:24:34 +0200 Subject: [PATCH 31/34] Add a few more gateway events --- Cargo.toml | 2 +- src/api/types.rs | 38 ++++++++++++++++++++++++++++++++++++++ src/gateway.rs | 20 ++++++++++++++++---- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f30de7..f346a21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ serde = {version = "1.0.159", features = ["derive"]} serde_json = "1.0.95" reqwest = {version = "0.11.16", features = ["multipart"]} url = "2.3.1" -chrono = "0.4.24" +chrono = {version = "0.4.24", features = ["serde"]} regex = "1.7.3" custom_error = "1.9.2" native-tls = "0.2.11" diff --git a/src/api/types.rs b/src/api/types.rs index 784cb73..fc55061 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -4,6 +4,7 @@ https://discord.com/developers/docs . I do not feel like re-documenting all of this, as everything is already perfectly explained there. */ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::{api::limits::Limits, instance::Instance}; @@ -857,6 +858,43 @@ pub struct GatewayHeartbeatAck { impl WebSocketEvent for GatewayHeartbeatAck {} +#[derive(Debug, Default, Deserialize, Serialize)] +/// See https://discord.com/developers/docs/topics/gateway-events#channel-pins-update +pub struct ChannelPinsUpdate { + pub guild_id: Option, + pub channel_id: String, + pub last_pin_timestamp: Option> +} + +impl WebSocketEvent for ChannelPinsUpdate {} + +#[derive(Debug, Default, Deserialize, Serialize)] +/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-add-guild-ban-add-event-fields +pub struct GuildBanAdd { + pub guild_id: String, + pub user: UserObject, +} + +impl WebSocketEvent for GuildBanAdd {} + +#[derive(Debug, Default, Deserialize, Serialize)] +/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-remove +pub struct GuildBanRemove { + pub guild_id: String, + pub user: UserObject, +} + +impl WebSocketEvent for GuildBanRemove {} + +#[derive(Debug, Default, Deserialize, Serialize)] +/// See https://discord.com/developers/docs/topics/gateway-events#user-update +/// Not directly serialized, as the inner payload is the user object +pub struct UserUpdate { + pub user: UserObject, +} + +impl WebSocketEvent for UserUpdate {} + #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayPayload { pub op: u8, diff --git a/src/gateway.rs b/src/gateway.rs index 964ac01..ca8bd6c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -183,6 +183,7 @@ impl<'a> Gateway { "READY" => { let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } + "RESUMED" => {} "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} "AUTO_MODERATION_RULE_CREATE" => {} "AUTO_MODERATION_RULE_UPDATE" => {} @@ -200,10 +201,16 @@ impl<'a> Gateway { "THREAD_MEMBERS_UPDATE" => {} "GUILD_CREATE" => {} "GUILD_UPDATE" => {} - "GUILD_DELETE" => {} + "GUILD_DELETE" => { + let new_data: UnavailableGuild = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + } "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} - "GUILD_BAN_ADD" => {} - "GUILD_BAN_REMOVE" => {} + "GUILD_BAN_ADD" => { + let new_data: GuildBanAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + } + "GUILD_BAN_REMOVE" => { + let new_data: GuildBanRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + } "GUILD_EMOJIS_UPDATE" => {} "GUILD_STICKERS_UPDATE" => {} "GUILD_INTEGRATIONS_UPDATE" => {} @@ -270,7 +277,11 @@ impl<'a> Gateway { let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.lock().await.user.typing_start_event.update_data(new_data).await; } - "USER_UPDATE" => {} + "USER_UPDATE" => { + let user: UserObject = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + let new_data = UserUpdate {user}; + self.events.lock().await.user.user_update.update_data(new_data).await; + } "VOICE_STATE_UPDATE" => {} "VOICE_SERVER_UPDATE" => {} "WEBHOOKS_UPDATE" => {} @@ -531,6 +542,7 @@ mod events { #[derive(Default, Debug)] pub struct User { + pub user_update: GatewayEvent, pub presence_update: GatewayEvent, pub typing_start_event: GatewayEvent, } From 4765b7a362ef80802f5a5a51a55b7f53122e86de Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 16:34:05 +0200 Subject: [PATCH 32/34] WebSocketConnection are no longer used --- src/gateway.rs | 51 -------------------------------------------------- 1 file changed, 51 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index ca8bd6c..74b55c0 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -385,57 +385,6 @@ struct HeartbeatThreadCommunication { d: u64 } -struct WebSocketConnection { - rx: Arc>>, - tx: Arc>, tokio_tungstenite::tungstenite::Message>>>, -} - -impl<'a> WebSocketConnection { - async fn new(websocket_url: String) -> WebSocketConnection { - let (receive_channel_write, receive_channel_read): ( - Sender, - Receiver, - ) = channel(32); - - let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); - - let (ws_stream, _) = match connect_async_tls_with_config( - &websocket_url, - None, - Some(Connector::NativeTls( - TlsConnector::builder().build().unwrap(), - )), - ) - .await - { - Ok(ws_stream) => ws_stream, - Err(e) => panic!("{:?}", e), - }; - - let (ws_tx, mut ws_rx) = ws_stream.split(); - - task::spawn(async move { - loop { - - // Write received messages to the receive channel - let msg = ws_rx.next().await; - if msg.as_ref().is_some() { - let msg_unwrapped = msg.unwrap().unwrap(); - receive_channel_write - .send(msg_unwrapped) - .await - .unwrap(); - }; - } - }); - - WebSocketConnection { - tx: Arc::new(Mutex::new(ws_tx)), - rx: shared_receive_channel_read, - } - } -} - /** Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to an Observable. The Observer is notified when the Observable's data changes. From 9a80f687873306e437a8dee4ee6386deefa19115 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 16:35:42 +0200 Subject: [PATCH 33/34] Warnings --- src/gateway.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 74b55c0..05c043a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -10,7 +10,7 @@ use native_tls::TlsConnector; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::Sender; use tokio::sync::Mutex; use tokio::task; use tokio::time; @@ -181,7 +181,7 @@ impl<'a> Gateway { // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { "READY" => { - let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + let _data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "RESUMED" => {} "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} @@ -202,14 +202,14 @@ impl<'a> Gateway { "GUILD_CREATE" => {} "GUILD_UPDATE" => {} "GUILD_DELETE" => { - let new_data: UnavailableGuild = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + let _new_data: UnavailableGuild = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} "GUILD_BAN_ADD" => { - let new_data: GuildBanAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + let _new_data: GuildBanAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "GUILD_BAN_REMOVE" => { - let new_data: GuildBanRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + let _new_data: GuildBanRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "GUILD_EMOJIS_UPDATE" => {} "GUILD_STICKERS_UPDATE" => {} @@ -545,7 +545,7 @@ mod example { match event.subscribe(arc_mut_second_consumer.clone()) { None => assert!(true), - Some(err) => assert!(false), + Some(_) => assert!(false), } } From a3969e8cb65acdc20921a9a64f35e86b91e98b63 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 16:43:29 +0200 Subject: [PATCH 34/34] Small changes for merging --- src/api/policies/instance/instance.rs | 2 +- src/gateway.rs | 2 +- src/instance.rs | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 21a0c89..b864e54 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -53,4 +53,4 @@ mod instance_policies_schema_test { let _schema = test_instance.instance_policies_schema().await.unwrap(); } -} \ No newline at end of file +} diff --git a/src/gateway.rs b/src/gateway.rs index 05c043a..2d6eb06 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -101,7 +101,7 @@ pub struct Gateway { pub websocket_tx: Arc>, tokio_tungstenite::tungstenite::Message>>> } -impl<'a> Gateway { +impl Gateway { pub async fn new( websocket_url: String, ) -> Result { diff --git a/src/instance.rs b/src/instance.rs index bb54bfa..ee33961 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,14 +1,13 @@ use crate::api::limits::Limits; use crate::api::types::{InstancePolicies}; use crate::errors::{FieldFormatError, InstanceServerError}; -use crate::gateway::{GatewayHandle, Gateway}; use crate::limit::LimitedRequester; use crate::URLBundle; use std::fmt; -#[derive(Debug)] +#[derive(Debug)] /** The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server. */