From 284073deeb80cd151d0ea24fd35ce5d767c74fdc Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Thu, 4 May 2023 20:05:33 +0200 Subject: [PATCH 01/24] Add WebSocketConnection to Gateway --- src/gateway.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 818d14b..d3c6e5f 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -34,6 +34,7 @@ pub struct Gateway<'a> { pub url: String, pub token: String, pub events: Events<'a>, + websocket: WebSocketConnection, } impl<'a> Gateway<'a> { @@ -42,9 +43,10 @@ impl<'a> Gateway<'a> { token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { return Ok(Gateway { - url: websocket_url, + url: websocket_url.clone(), token, events: Events::default(), + websocket: WebSocketConnection::new(websocket_url).await, }); } } From 0ff3a21423f919cc9edca0d747d696c90ea751e4 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Thu, 4 May 2023 20:57:45 +0200 Subject: [PATCH 02/24] Add basic event receiving --- src/gateway.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index d3c6e5f..ed5c333 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -49,6 +49,106 @@ impl<'a> Gateway<'a> { websocket: WebSocketConnection::new(websocket_url).await, }); } + + /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers + pub async fn update_events(&mut self) { + while let Some(msg) = self.websocket.rx.lock().await.recv().await { + let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes + match gateway_payload.op { + // Dispatch + // An event was dispatched, we need to look at the gateway event name t + 0 => { + let gateway_payload_t = gateway_payload.t.unwrap(); + + // See https://discord.com/developers/docs/topics/gateway-events#receive-events + match gateway_payload_t.as_str() { + "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} + "AUTO_MODERATION_RULE_CREATE" => {} + "AUTO_MODERATION_RULE_UPDATE" => {} + "AUTO_MODERATION_RULE_DELETE" => {} + "AUTO_MODERATION_ACTION_EXECUTION" => {} + "CHANNEL_CREATE" => {} + "CHANNEL_UPDATE" => {} + "CHANNEL_DELETE" => {} + "CHANNEL_PINS_UPDATE" => {} + "THREAD_CREATE" => {} + "THREAD_UPDATE" => {} + "THREAD_DELETE" => {} + "THREAD_LIST_SYNC" => {} + "THREAD_MEMBER_UPDATE" => {} + "THREAD_MEMBERS_UPDATE" => {} + "GUILD_CREATE" => {} + "GUILD_UPDATE" => {} + "GUILD_DELETE" => {} + "GUILD_AUDIT_LOG_ENTRY_CREATE" => {} + "GUILD_BAN_ADD" => {} + "GUILD_BAN_REMOVE" => {} + "GUILD_EMOJIS_UPDATE" => {} + "GUILD_STICKERS_UPDATE" => {} + "GUILD_INTEGRATIONS_UPDATE" => {} + "GUILD_MEMBER_ADD" => {} + "GUILD_MEMBER_REMOVE" => {} + "GUILD_MEMBER_UPDATE" => {} + "GUILD_MEMBERS_CHUNK" => {} + "GUILD_ROLE_CREATE" => {} + "GUILD_ROLE_UPDATE" => {} + "GUILD_ROLE_DELETE" => {} + "GUILD_SCHEDULED_EVENT_CREATE" => {} + "GUILD_SCHEDULED_EVENT_UPDATE" => {} + "GUILD_SCHEDULED_EVENT_DELETE" => {} + "GUILD_SCHEDULED_EVENT_USER_ADD" => {} + "GUILD_SCHEDULED_EVENT_USER_REMOVE" => {} + "INTEGRATION_CREATE" => {} + "INTEGRATION_UPDATE" => {} + "INTEGRATION_DELETE" => {} + "INTERACTION_CREATE" => {} + "INVITE_CREATE" => {} + "INVITE_DELETE" => {} + "MESSAGE_CREATE" => {} + "MESSAGE_UPDATE" => {} + "MESSAGE_DELETE" => {} + "MESSAGE_DELETE_BULK" => {} + "MESSAGE_REACTION_ADD" => {} + "MESSAGE_REACTION_REMOVE" => {} + "MESSAGE_REACTION_REMOVE_ALL" => {} + "MESSAGE_REACTION_REMOVE_EMOJI" => {} + "PRESENCE_UPDATE" => {} + "STAGE_INSTANCE_CREATE" => {} + "STAGE_INSTANCE_UPDATE" => {} + "STAGE_INSTANCE_DELETE" => {} + "TYPING_START" => {} + "USER_UPDATE" => {} + "VOICE_STATE_UPDATE" => {} + "VOICE_SERVER_UPDATE" => {} + "WEBHOOKS_UPDATE" => {} + _ => {panic!("Invalid gateway event ({})", &gateway_payload_t)} + } + } + // Heartbeat + // We received a heartbeat from the server + 1 => { + let gateway_heartbeat: GatewayHeartbeat = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + // Reconnect + 7 => {todo!()} + // Invalid Session + 9 => {todo!()} + // Hello + // Should start our heartbeat + 10 => { + let gateway_hello: GatewayHello = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + // Heartbeat ACK + 11 => { + let gateway_hb_ack: GatewayHeartbeatAck = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + } + 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} + _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} + } + } + } } struct WebSocketConnection { From 28f3312cacf629ee41fed51d147dc74c94c39f98 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 15:39:11 +0200 Subject: [PATCH 03/24] No need to serialize this --- src/gateway.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index ed5c333..80cba24 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -128,9 +128,7 @@ impl<'a> Gateway<'a> { } // Heartbeat // We received a heartbeat from the server - 1 => { - let gateway_heartbeat: GatewayHeartbeat = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); - } + 1 => {} // Reconnect 7 => {todo!()} // Invalid Session @@ -138,12 +136,10 @@ impl<'a> Gateway<'a> { // Hello // Should start our heartbeat 10 => { - let gateway_hello: GatewayHello = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); } // Heartbeat ACK - 11 => { - let gateway_hb_ack: GatewayHeartbeatAck = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); - } + 11 => {} 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } From 2dda6f767e44fd1364aa74d80b2a67512f5d9cd9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 15:46:45 +0200 Subject: [PATCH 04/24] Update data for existing events --- src/gateway.rs | 50 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 80cba24..012e764 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -106,19 +106,49 @@ impl<'a> Gateway<'a> { "INTERACTION_CREATE" => {} "INVITE_CREATE" => {} "INVITE_DELETE" => {} - "MESSAGE_CREATE" => {} - "MESSAGE_UPDATE" => {} - "MESSAGE_DELETE" => {} - "MESSAGE_DELETE_BULK" => {} - "MESSAGE_REACTION_ADD" => {} - "MESSAGE_REACTION_REMOVE" => {} - "MESSAGE_REACTION_REMOVE_ALL" => {} - "MESSAGE_REACTION_REMOVE_EMOJI" => {} - "PRESENCE_UPDATE" => {} + "MESSAGE_CREATE" => { + let new_data: MessageCreate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.create.update_data(new_data); + } + "MESSAGE_UPDATE" => { + let new_data: MessageUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.update.update_data(new_data); + } + "MESSAGE_DELETE" => { + let new_data: MessageDelete = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.delete.update_data(new_data); + } + "MESSAGE_DELETE_BULK" => { + let new_data: MessageDeleteBulk = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.delete_bulk.update_data(new_data); + } + "MESSAGE_REACTION_ADD" => { + let new_data: MessageReactionAdd = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_add.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE" => { + let new_data: MessageReactionRemove = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE_ALL" => { + let new_data: MessageReactionRemoveAll = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove_all.update_data(new_data); + } + "MESSAGE_REACTION_REMOVE_EMOJI" => { + let new_data: MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.message.reaction_remove_emoji.update_data(new_data); + } + "PRESENCE_UPDATE" => { + let new_data: PresenceUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.user.presence_update.update_data(new_data); + } "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} - "TYPING_START" => {} + "TYPING_START" => { + let new_data: TypingStartEvent = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + self.events.user.typing_start_event.update_data(new_data); + } "USER_UPDATE" => {} "VOICE_STATE_UPDATE" => {} "VOICE_SERVER_UPDATE" => {} From ec2030794d8c9f7c85a9dfdbef725fa9f5662094 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 16:08:12 +0200 Subject: [PATCH 05/24] Add gateway Ready event --- src/api/types.rs | 18 ++++++++++++++++++ src/gateway.rs | 3 +++ 2 files changed, 21 insertions(+) diff --git a/src/api/types.rs b/src/api/types.rs index 2efbc1a..2bed6b5 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -134,6 +134,12 @@ pub struct Error { pub code: String, } +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct UnavailableGuild { + id: String, + unavailable: bool +} + #[derive(Serialize, Deserialize, Debug, Default)] pub struct UserObject { id: String, @@ -802,6 +808,18 @@ pub struct GatewayResume { impl WebSocketEvent for GatewayResume {} +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct GatewayReady { + pub v: u8, + pub user: UserObject, + pub guilds: Vec, + pub session_id: String, + pub resume_gateway_url: String, + pub shard: Option<(u64, u64)>, +} + +impl WebSocketEvent for GatewayReady {} + #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayHello { pub op: i32, diff --git a/src/gateway.rs b/src/gateway.rs index 012e764..83cf07c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -64,6 +64,9 @@ impl<'a> Gateway<'a> { // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { + "READY" => { + let data: GatewayReady = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + } "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} "AUTO_MODERATION_RULE_CREATE" => {} "AUTO_MODERATION_RULE_UPDATE" => {} From 23472d01d954595ba1cd35b5e6e1de4920dffd4f Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 18:37:52 +0200 Subject: [PATCH 06/24] Update integers on some types --- src/api/types.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 2bed6b5..40f7f8f 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -830,7 +830,7 @@ impl WebSocketEvent for GatewayHello {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct HelloData { - pub heartbeat_interval: i32, + pub heartbeat_interval: u128, } impl WebSocketEvent for HelloData {} @@ -838,7 +838,7 @@ impl WebSocketEvent for HelloData {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayHeartbeat { pub op: u8, - pub d: u64, + pub d: Option, } impl WebSocketEvent for GatewayHeartbeat {} @@ -852,9 +852,9 @@ impl WebSocketEvent for GatewayHeartbeatAck {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayPayload { - pub op: i32, + pub op: u8, pub d: Option, - pub s: Option, + pub s: Option, pub t: Option, } From 85d79bb304bed699e5d771f8f53010015e0a5174 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 18:38:04 +0200 Subject: [PATCH 07/24] Experimental heartbeats --- src/gateway.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 83cf07c..94beeb1 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -17,9 +17,12 @@ use serde::Serialize; use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; +use tokio::sync::mpsc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; +use tokio::time; +use tokio::time::Instant; use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; @@ -35,6 +38,7 @@ pub struct Gateway<'a> { pub token: String, pub events: Events<'a>, websocket: WebSocketConnection, + heartbeat_handler: Option } impl<'a> Gateway<'a> { @@ -47,6 +51,7 @@ impl<'a> Gateway<'a> { token, events: Events::default(), websocket: WebSocketConnection::new(websocket_url).await, + heartbeat_handler: None, }); } @@ -167,19 +172,94 @@ impl<'a> Gateway<'a> { // Invalid Session 9 => {todo!()} // Hello - // Should start our heartbeat + // Starts our heartbeat 10 => { let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK 11 => {} 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } + + // If we have an active heartbeat thread and we received a seq number we should let it know + if gateway_payload.s.is_some() { + if self.heartbeat_handler.is_some() { + + let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() }; + + self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap(); + } + } } } } +/** +Handles sending heartbeats to the gateway in another thread +*/ +struct HeartbeatHandler { + /// The heartbeat interval in milliseconds + heartbeat_interval: u128, + tx: Sender, +} + +impl HeartbeatHandler { + pub fn new(heartbeat_interval: u128, websocket_tx: Arc>>) -> HeartbeatHandler { + let (mut tx, mut rx) = mpsc::channel(32); + + task::spawn(async move { + let mut last_heartbeat: Instant = time::Instant::now(); + let mut last_seq_number: Option = None; + + loop { + + // If we received a seq number update, use that as the last seq number + let hb_communication: Option = rx.recv().await; + while hb_communication.is_some() { + last_seq_number = Some(hb_communication.unwrap().d); + } + + if last_heartbeat.elapsed().as_millis() > heartbeat_interval { + + let heartbeat = GatewayHeartbeat { + op: 1, + d: last_seq_number + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); + + websocket_tx.lock() + .await + .send(msg) + .await + .unwrap(); + + last_heartbeat = time::Instant::now(); + } + + } + }); + + Self { heartbeat_interval, tx } + } +} + +/** +Used to communicate with the main thread. +Either signifies a sequence number update or a received heartbeat ack +*/ +#[derive(Clone, Copy, Debug)] +struct HeartbeatThreadCommunication { + /// An opcode for the communication we received + op: u8, + /// The sequence number we got from discord + d: u64 +} + struct WebSocketConnection { rx: Arc>>, tx: Arc>>, From f6c9d5a80775f95e8186e4a1163b48a82005bd37 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 19:23:57 +0200 Subject: [PATCH 08/24] Experimental sending to gateway --- src/gateway.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index 94beeb1..6241e2a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -194,6 +194,42 @@ impl<'a> Gateway<'a> { } } } + + /// Sends json to the gateway with an opcode + async fn send_json_event(&self, op: u8, to_send: String) { + + let gateway_payload: GatewayPayload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; + + let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); + + self.websocket.tx.lock().await.send(message).await.unwrap(); + } + + /// Sends an identify event to the gateway + pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(2, to_send_json).await; + } + + /// Sends a resume event to the gateway + pub async fn send_resume(&self, to_send: GatewayResume) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(6, to_send_json).await; + } + + /// Sends an update presence event to the gateway + pub async fn send_update_presence(&self, to_send: PresenceUpdate) { + + let to_send_json = serde_json::to_string(&to_send).unwrap(); + + self.send_json_event(3, to_send_json).await; + } } /** From 6a99129fe770dd271e3e353f2ff2bdbe4c82d903 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 19:42:31 +0200 Subject: [PATCH 09/24] Add temp debug, remove unused token --- src/gateway.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 6241e2a..df9f230 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -35,7 +35,6 @@ implemented [Types] with the trait [`WebSocketEvent`] */ pub struct Gateway<'a> { pub url: String, - pub token: String, pub events: Events<'a>, websocket: WebSocketConnection, heartbeat_handler: Option @@ -44,11 +43,9 @@ pub struct Gateway<'a> { impl<'a> Gateway<'a> { pub async fn new( websocket_url: String, - token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { return Ok(Gateway { url: websocket_url.clone(), - token, events: Events::default(), websocket: WebSocketConnection::new(websocket_url).await, heartbeat_handler: None, @@ -58,6 +55,7 @@ impl<'a> Gateway<'a> { /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers pub async fn update_events(&mut self) { while let Some(msg) = self.websocket.rx.lock().await.recv().await { + println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes @@ -202,6 +200,8 @@ impl<'a> Gateway<'a> { let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + println!("Debug GW: Sending WSE: {}", payload_json.clone()); + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); self.websocket.tx.lock().await.send(message).await.unwrap(); @@ -513,7 +513,7 @@ mod example { #[tokio::test] async fn test_gateway() { - let gateway = Gateway::new("ws://localhost:3001/".to_string(), "none".to_string()) + let gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); } From 47c38c55415a5f4b57bfb12913d856472249cce9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:26:51 +0200 Subject: [PATCH 10/24] Give websocket a send and receive channel --- src/gateway.rs | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index df9f230..08e6ad5 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -310,15 +310,25 @@ impl<'a> WebSocketConnection { )); }*/ - let (mut channel_write, mut channel_read): ( + let (mut send_channel_write, mut send_channel_read): ( Sender, Receiver, ) = channel(32); - let shared_channel_write = Arc::new(Mutex::new(channel_write)); - let clone_shared_channel_write = shared_channel_write.clone(); - let shared_channel_read = Arc::new(Mutex::new(channel_read)); - let clone_shared_channel_read = shared_channel_read.clone(); + let (mut receive_channel_write, mut receive_channel_read): ( + Sender, + Receiver, + ) = channel(32); + + let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); + let shared_send_channel_read = Arc::new(Mutex::new(send_channel_read)); + + let clone_shared_send_channel_write = shared_send_channel_write.clone(); + + let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); + let shared_receive_channel_write = Arc::new(Mutex::new(receive_channel_write)); + + let clone_shared_receive_channel_read = shared_receive_channel_read.clone(); task::spawn(async move { let (mut ws_stream, _) = match connect_async_tls_with_config( @@ -336,14 +346,14 @@ impl<'a> WebSocketConnection { ))*/ }; - let (mut write_tx, mut write_rx) = ws_stream.split(); + let (mut ws_tx, mut ws_rx) = ws_stream.split(); - while let Some(msg) = shared_channel_read.lock().await.recv().await { - write_tx.send(msg).await.unwrap(); + while let Some(msg) = shared_send_channel_read.lock().await.recv().await { + ws_tx.send(msg).await.unwrap(); } - let event = while let Some(msg) = write_rx.next().await { - shared_channel_write + let event = while let Some(msg) = ws_rx.next().await { + shared_receive_channel_write .lock() .await .send(msg.unwrap()) @@ -353,8 +363,8 @@ impl<'a> WebSocketConnection { }); WebSocketConnection { - tx: clone_shared_channel_write, - rx: clone_shared_channel_read, + tx: clone_shared_send_channel_write, + rx: clone_shared_receive_channel_read, } } } @@ -513,8 +523,18 @@ mod example { #[tokio::test] async fn test_gateway() { - let gateway = Gateway::new("ws://localhost:3001/".to_string()) + let mut gateway = Gateway::new("wss://localhost:3001/".to_string()) .await .unwrap(); + + let mut test_indetify = GatewayIdentifyPayload::default(); + test_indetify.intents = 1000; + test_indetify.token = "haaaaeeuuuggghh".to_string(); + test_indetify.properties = GatewayIdentifyConnectionProps { os: "Linux".to_string(), browser: "Very Poggers Discord Client".to_string(), device: "Some device idfk".to_string()}; + gateway.send_identify(test_indetify).await; + + loop { + gateway.update_events().await; + } } } From ab4c20562c665ccdb66570430f53e0221b2bf9f8 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:28:47 +0200 Subject: [PATCH 11/24] Remove the dumb test stuff I left in --- src/gateway.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 08e6ad5..9c961ed 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -523,18 +523,8 @@ mod example { #[tokio::test] async fn test_gateway() { - let mut gateway = Gateway::new("wss://localhost:3001/".to_string()) + let mut gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); - - let mut test_indetify = GatewayIdentifyPayload::default(); - test_indetify.intents = 1000; - test_indetify.token = "haaaaeeuuuggghh".to_string(); - test_indetify.properties = GatewayIdentifyConnectionProps { os: "Linux".to_string(), browser: "Very Poggers Discord Client".to_string(), device: "Some device idfk".to_string()}; - gateway.send_identify(test_indetify).await; - - loop { - gateway.update_events().await; - } } } From 7ac4da8482128d549a8d71d9fe73e9bb430b14b6 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 20:58:00 +0200 Subject: [PATCH 12/24] Fix encoding wrong --- src/api/types.rs | 2 +- src/gateway.rs | 40 ++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 40f7f8f..16cccda 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -853,7 +853,7 @@ impl WebSocketEvent for GatewayHeartbeatAck {} #[derive(Debug, Default, Deserialize, Serialize)] pub struct GatewayPayload { pub op: u8, - pub d: Option, + pub d: Option, pub s: Option, pub t: Option, } diff --git a/src/gateway.rs b/src/gateway.rs index 9c961ed..faba607 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -68,7 +68,7 @@ impl<'a> Gateway<'a> { // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { "READY" => { - let data: GatewayReady = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); } "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {} "AUTO_MODERATION_RULE_CREATE" => {} @@ -113,46 +113,46 @@ impl<'a> Gateway<'a> { "INVITE_CREATE" => {} "INVITE_DELETE" => {} "MESSAGE_CREATE" => { - let new_data: MessageCreate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.create.update_data(new_data); } "MESSAGE_UPDATE" => { - let new_data: MessageUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.update.update_data(new_data); } "MESSAGE_DELETE" => { - let new_data: MessageDelete = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.delete.update_data(new_data); } "MESSAGE_DELETE_BULK" => { - let new_data: MessageDeleteBulk = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.delete_bulk.update_data(new_data); } "MESSAGE_REACTION_ADD" => { - let new_data: MessageReactionAdd = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_add.update_data(new_data); } "MESSAGE_REACTION_REMOVE" => { - let new_data: MessageReactionRemove = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove.update_data(new_data); } "MESSAGE_REACTION_REMOVE_ALL" => { - let new_data: MessageReactionRemoveAll = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove_all.update_data(new_data); } "MESSAGE_REACTION_REMOVE_EMOJI" => { - let new_data: MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.message.reaction_remove_emoji.update_data(new_data); } "PRESENCE_UPDATE" => { - let new_data: PresenceUpdate = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.presence_update.update_data(new_data); } "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} "TYPING_START" => { - let new_data: TypingStartEvent = serde_json::from_str(gateway_payload.d.unwrap().as_str()).unwrap(); + let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.typing_start_event.update_data(new_data); } "USER_UPDATE" => {} @@ -172,7 +172,7 @@ impl<'a> Gateway<'a> { // Hello // Starts our heartbeat 10 => { - let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap(); + let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK @@ -194,9 +194,9 @@ impl<'a> Gateway<'a> { } /// Sends json to the gateway with an opcode - async fn send_json_event(&self, op: u8, to_send: String) { + async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { - let gateway_payload: GatewayPayload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; + let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; let payload_json = serde_json::to_string(&gateway_payload).unwrap(); @@ -210,25 +210,25 @@ impl<'a> Gateway<'a> { /// Sends an identify event to the gateway pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(2, to_send_json).await; + self.send_json_event(2, to_send_value).await; } /// Sends a resume event to the gateway pub async fn send_resume(&self, to_send: GatewayResume) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(6, to_send_json).await; + self.send_json_event(6, to_send_value).await; } /// Sends an update presence event to the gateway pub async fn send_update_presence(&self, to_send: PresenceUpdate) { - let to_send_json = serde_json::to_string(&to_send).unwrap(); + let to_send_value = serde_json::to_value(&to_send).unwrap(); - self.send_json_event(3, to_send_json).await; + self.send_json_event(3, to_send_value).await; } } From 22bfe1be074c0611489b03849175aa91992cae10 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 22:04:57 +0200 Subject: [PATCH 13/24] Readd gateway to instance --- src/api/auth/login.rs | 2 +- src/api/auth/register.rs | 2 +- src/api/policies/instance/instance.rs | 2 +- src/instance.rs | 11 ++++++----- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/api/auth/login.rs b/src/api/auth/login.rs index f7c4fe6..b419279 100644 --- a/src/api/auth/login.rs +++ b/src/api/auth/login.rs @@ -8,7 +8,7 @@ pub mod login { use crate::errors::InstanceServerError; use crate::instance::Instance; - impl Instance { + impl Instance<'_> { pub async fn login_account( &mut self, login_schema: &LoginSchema, diff --git a/src/api/auth/register.rs b/src/api/auth/register.rs index b932b9f..df7d8dc 100644 --- a/src/api/auth/register.rs +++ b/src/api/auth/register.rs @@ -8,7 +8,7 @@ pub mod register { instance::{Instance, Token}, }; - impl Instance { + impl Instance<'_> { /** Registers a new user on the Spacebar server. # Arguments diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 7c1a48a..1c53b27 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -5,7 +5,7 @@ pub mod instance { use crate::errors::InstanceServerError; use crate::{api::types::InstancePolicies, instance::Instance}; - impl Instance { + impl Instance<'_> { /** Gets the instance policies schema. # Errors diff --git a/src/instance.rs b/src/instance.rs index 3e52e88..b1cb153 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,26 +1,26 @@ use crate::api::limits::Limits; use crate::api::types::{InstancePolicies, User}; use crate::errors::{FieldFormatError, InstanceServerError}; +use crate::gateway::Gateway; use crate::limit::LimitedRequester; use crate::URLBundle; use std::collections::HashMap; use std::fmt; -#[derive(Debug)] /** The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server. */ -pub struct Instance { +pub struct Instance<'a> { pub urls: URLBundle, pub instance_info: InstancePolicies, pub requester: LimitedRequester, pub limits: Limits, - //pub gateway: Gateway, + pub gateway: Gateway<'a>, pub users: HashMap, } -impl Instance { +impl Instance<'_> { /// Creates a new [`Instance`]. /// # Arguments /// * `urls` - The [`URLBundle`] that contains all the URLs that are needed to connect to the Spacebar server. @@ -30,7 +30,7 @@ impl Instance { pub async fn new( urls: URLBundle, requester: LimitedRequester, - ) -> Result { + ) -> Result, InstanceServerError> { let users: HashMap = HashMap::new(); let mut instance = Instance { urls: urls.clone(), @@ -48,6 +48,7 @@ impl Instance { limits: Limits::check_limits(urls.api).await, requester, users, + gateway: Gateway::new(urls.wss.clone()).await.unwrap(), }; instance.instance_info = match instance.instance_policies_schema().await { Ok(schema) => schema, From a4fdf181064e92f075f305ce0afc73870e64d6cb Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 5 May 2023 22:46:00 +0200 Subject: [PATCH 14/24] Fixed instance lifetime parameter --- src/api/auth/login.rs | 2 +- src/api/auth/register.rs | 2 +- src/api/policies/instance/instance.rs | 2 +- src/instance.rs | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/api/auth/login.rs b/src/api/auth/login.rs index b419279..23add11 100644 --- a/src/api/auth/login.rs +++ b/src/api/auth/login.rs @@ -8,7 +8,7 @@ pub mod login { use crate::errors::InstanceServerError; use crate::instance::Instance; - impl Instance<'_> { + impl<'a> Instance<'a> { pub async fn login_account( &mut self, login_schema: &LoginSchema, diff --git a/src/api/auth/register.rs b/src/api/auth/register.rs index df7d8dc..b4d4fd1 100644 --- a/src/api/auth/register.rs +++ b/src/api/auth/register.rs @@ -8,7 +8,7 @@ pub mod register { instance::{Instance, Token}, }; - impl Instance<'_> { + impl<'a> Instance<'a> { /** Registers a new user on the Spacebar server. # Arguments diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 1c53b27..f7a5653 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -5,7 +5,7 @@ pub mod instance { use crate::errors::InstanceServerError; use crate::{api::types::InstancePolicies, instance::Instance}; - impl Instance<'_> { + impl<'a> Instance<'a> { /** Gets the instance policies schema. # Errors diff --git a/src/instance.rs b/src/instance.rs index b1cb153..5ebcbaa 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -20,7 +20,7 @@ pub struct Instance<'a> { pub users: HashMap, } -impl Instance<'_> { +impl<'a> Instance<'a> { /// Creates a new [`Instance`]. /// # Arguments /// * `urls` - The [`URLBundle`] that contains all the URLs that are needed to connect to the Spacebar server. @@ -30,7 +30,7 @@ impl Instance<'_> { pub async fn new( urls: URLBundle, requester: LimitedRequester, - ) -> Result, InstanceServerError> { + ) -> Result, InstanceServerError> { let users: HashMap = HashMap::new(); let mut instance = Instance { urls: urls.clone(), From 7dbdcf828de6a35df1517d180ee77bc3c4b47dec Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 08:39:41 +0200 Subject: [PATCH 15/24] Slight code cleanup --- src/gateway.rs | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index faba607..a1a8d7c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -321,14 +321,7 @@ impl<'a> WebSocketConnection { ) = channel(32); let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); - let shared_send_channel_read = Arc::new(Mutex::new(send_channel_read)); - - let clone_shared_send_channel_write = shared_send_channel_write.clone(); - let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); - let shared_receive_channel_write = Arc::new(Mutex::new(receive_channel_write)); - - let clone_shared_receive_channel_read = shared_receive_channel_read.clone(); task::spawn(async move { let (mut ws_stream, _) = match connect_async_tls_with_config( @@ -348,14 +341,14 @@ impl<'a> WebSocketConnection { let (mut ws_tx, mut ws_rx) = ws_stream.split(); - while let Some(msg) = shared_send_channel_read.lock().await.recv().await { + // Send messages from the send channel + while let Some(msg) = send_channel_read.recv().await { ws_tx.send(msg).await.unwrap(); } - let event = while let Some(msg) = ws_rx.next().await { - shared_receive_channel_write - .lock() - .await + // Write received messages to the receive channel + while let Some(msg) = ws_rx.next().await { + receive_channel_write .send(msg.unwrap()) .await .unwrap(); @@ -363,8 +356,8 @@ impl<'a> WebSocketConnection { }); WebSocketConnection { - tx: clone_shared_send_channel_write, - rx: clone_shared_receive_channel_read, + tx: shared_send_channel_write, + rx: shared_receive_channel_read, } } } @@ -526,5 +519,9 @@ mod example { let mut gateway = Gateway::new("ws://localhost:3001/".to_string()) .await .unwrap(); + gateway.send_identify(GatewayIdentifyPayload::default()).await; + loop { + gateway.update_events().await; + } } } From 6fdff9772262624b0fe8dfce62a952415280ab5c Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 10:39:16 +0200 Subject: [PATCH 16/24] Update types to fix deserialization errors --- src/api/types.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/api/types.rs b/src/api/types.rs index 16cccda..cf0e0c4 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -151,10 +151,10 @@ pub struct UserObject { mfa_enabled: Option, banner: Option, accent_color: Option, - locale: String, + locale: Option, verified: Option, email: Option, - flags: i8, + flags: String, // Not sure why flags is a string, but real responses from the gateway give this is an integer in string format premium_type: Option, public_flags: Option, } @@ -814,7 +814,7 @@ pub struct GatewayReady { pub user: UserObject, pub guilds: Vec, pub session_id: String, - pub resume_gateway_url: String, + pub resume_gateway_url: Option, pub shard: Option<(u64, u64)>, } @@ -858,4 +858,4 @@ pub struct GatewayPayload { pub t: Option, } -impl WebSocketEvent for GatewayPayload {} +impl WebSocketEvent for GatewayPayload {} \ No newline at end of file From 172c00081306c935fc1265726576822ef7716589 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 10:39:58 +0200 Subject: [PATCH 17/24] Update Websocket to fix premature closing w 1006 --- src/gateway.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index a1a8d7c..dbf52b2 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -341,18 +341,25 @@ impl<'a> WebSocketConnection { let (mut ws_tx, mut ws_rx) = ws_stream.split(); - // Send messages from the send channel - while let Some(msg) = send_channel_read.recv().await { - ws_tx.send(msg).await.unwrap(); - } + loop { - // Write received messages to the receive channel - while let Some(msg) = ws_rx.next().await { - receive_channel_write - .send(msg.unwrap()) - .await - .unwrap(); - }; + // Write received messages to the receive channel + let msg = ws_rx.next().await; + if msg.as_ref().is_some() { + let msg_unwrapped = msg.unwrap().unwrap(); + receive_channel_write + .send(msg_unwrapped) + .await + .unwrap(); + }; + + // Send messages from the send channel + let msg = send_channel_read.recv().await; + if msg.as_ref().is_some() { + let msg_unwrapped = msg.unwrap(); + ws_tx.send(msg_unwrapped).await.unwrap(); + } + } }); WebSocketConnection { From 7d3077eec4c6ddf7bcb0a83c42a3ba49f1bfa1f9 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 11:13:07 +0200 Subject: [PATCH 18/24] Pub mod everything so it can be used as a library --- src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 00170d0..e7699e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ -mod api; -mod errors; -mod gateway; -mod instance; -mod limit; -mod voice; +pub mod api; +pub mod errors; +pub mod gateway; +pub mod instance; +pub mod limit; +pub mod voice; use url::{ParseError, Url}; #[derive(Clone, Default, Debug, PartialEq, Eq)] From 9f187b88642038f78325e2f4b8be6dc0ddd825f8 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 6 May 2023 11:14:38 +0200 Subject: [PATCH 19/24] Add a debug to heartbeat thread --- src/gateway.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/gateway.rs b/src/gateway.rs index dbf52b2..756f7f7 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -259,6 +259,8 @@ impl HeartbeatHandler { if last_heartbeat.elapsed().as_millis() > heartbeat_interval { + println!("Debug GW: Sending Heartbeat"); + let heartbeat = GatewayHeartbeat { op: 1, d: last_seq_number From fc93762cf7d1bfe296a6941e39146996afb07c8e Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 11:47:12 +0200 Subject: [PATCH 20/24] Fix deserialization error --- src/api/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/types.rs b/src/api/types.rs index cf0e0c4..456be43 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -740,7 +740,7 @@ pub struct PresenceUpdate { since: Option, activities: Vec, status: String, - afk: bool, + afk: Option, } impl WebSocketEvent for PresenceUpdate {} From b4888d2f702b83d34b9b6a1adf8ac53b5d0cd561 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 11:58:12 +0200 Subject: [PATCH 21/24] Rethink websockets, fix thread blocks --- src/gateway.rs | 92 +++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 57 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 756f7f7..6f65f57 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,32 +1,24 @@ use std::sync::Arc; -use std::thread; use crate::api::types::*; use crate::api::WebSocketEvent; use crate::errors::ObserverError; use crate::gateway::events::Events; -use crate::URLBundle; -use futures_util::stream::{FilterMap, SplitSink, SplitStream}; use futures_util::SinkExt; use futures_util::StreamExt; +use futures_util::stream::SplitSink; use native_tls::TlsConnector; -use reqwest::Url; -use serde::Deserialize; -use serde::Serialize; -use serde_json::from_str; -use tokio::io; use tokio::net::TcpStream; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use tokio::task; use tokio::time; use tokio::time::Instant; -use tokio_tungstenite::tungstenite::error::UrlError; -use tokio_tungstenite::WebSocketStream; -use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; -use tokio_tungstenite::{Connector, MaybeTlsStream}; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::{WebSocketStream, Connector, connect_async_tls_with_config}; /** Represents a Gateway connection. A Gateway connection will create observable @@ -54,7 +46,13 @@ impl<'a> Gateway<'a> { /// This function reads all messages from the gateway's websocket and updates its events along with the events' observers pub async fn update_events(&mut self) { - while let Some(msg) = self.websocket.rx.lock().await.recv().await { + + while let Ok(msg) = self.websocket.rx.lock().await.try_recv() { + + if msg.to_string() == String::new() { + continue; + } + println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); @@ -151,6 +149,8 @@ impl<'a> Gateway<'a> { "STAGE_INSTANCE_CREATE" => {} "STAGE_INSTANCE_UPDATE" => {} "STAGE_INSTANCE_DELETE" => {} + // Not documented in discord docs, I assume this isnt for bots / apps but is for users? + "SESSIONS_REPLACE" => {} "TYPING_START" => { let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.events.user.typing_start_event.update_data(new_data); @@ -242,8 +242,8 @@ struct HeartbeatHandler { } impl HeartbeatHandler { - pub fn new(heartbeat_interval: u128, websocket_tx: Arc>>) -> HeartbeatHandler { - let (mut tx, mut rx) = mpsc::channel(32); + pub fn new(heartbeat_interval: u128, websocket_tx: Arc>, tokio_tungstenite::tungstenite::Message>>>) -> HeartbeatHandler { + let (tx, mut rx) = mpsc::channel(32); task::spawn(async move { let mut last_heartbeat: Instant = time::Instant::now(); @@ -252,8 +252,8 @@ impl HeartbeatHandler { loop { // If we received a seq number update, use that as the last seq number - let hb_communication: Option = rx.recv().await; - while hb_communication.is_some() { + let hb_communication: Result = rx.try_recv(); + if hb_communication.is_ok() { last_seq_number = Some(hb_communication.unwrap().d); } @@ -278,7 +278,6 @@ impl HeartbeatHandler { last_heartbeat = time::Instant::now(); } - } }); @@ -300,49 +299,35 @@ struct HeartbeatThreadCommunication { struct WebSocketConnection { rx: Arc>>, - tx: Arc>>, + tx: Arc>, tokio_tungstenite::tungstenite::Message>>>, } impl<'a> WebSocketConnection { async fn new(websocket_url: String) -> WebSocketConnection { - let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); - /*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(tokio_tungstenite::tungstenite::Error::Url( - UrlError::UnsupportedUrlScheme, - )); - }*/ - let (mut send_channel_write, mut send_channel_read): ( + let (receive_channel_write, receive_channel_read): ( Sender, Receiver, ) = channel(32); - let (mut receive_channel_write, mut receive_channel_read): ( - Sender, - Receiver, - ) = channel(32); - - let shared_send_channel_write = Arc::new(Mutex::new(send_channel_write)); let shared_receive_channel_read = Arc::new(Mutex::new(receive_channel_read)); + let (ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(e) => panic!("{:?}", e), + }; + + let (ws_tx, mut ws_rx) = ws_stream.split(); + task::spawn(async move { - let (mut ws_stream, _) = match connect_async_tls_with_config( - &websocket_url, - None, - Some(Connector::NativeTls( - TlsConnector::builder().build().unwrap(), - )), - ) - .await - { - Ok(ws_stream) => ws_stream, - Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io( - io::ErrorKind::ConnectionAborted.into(), - ))*/ - }; - - let (mut ws_tx, mut ws_rx) = ws_stream.split(); - loop { // Write received messages to the receive channel @@ -354,18 +339,11 @@ impl<'a> WebSocketConnection { .await .unwrap(); }; - - // Send messages from the send channel - let msg = send_channel_read.recv().await; - if msg.as_ref().is_some() { - let msg_unwrapped = msg.unwrap(); - ws_tx.send(msg_unwrapped).await.unwrap(); - } } }); WebSocketConnection { - tx: shared_send_channel_write, + tx: Arc::new(Mutex::new(ws_tx)), rx: shared_receive_channel_read, } } From 0fd7be51a84f2bcf3789e0a780e4afb730971818 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:04:46 +0200 Subject: [PATCH 22/24] Fix merge conflict --- src/api/policies/instance/instance.rs | 62 +++++++++++++-------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index f7a5653..4f86136 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -1,41 +1,37 @@ -pub mod instance { - use reqwest::Client; - use serde_json::from_str; +use reqwest::Client; +use serde_json::from_str; - use crate::errors::InstanceServerError; - use crate::{api::types::InstancePolicies, instance::Instance}; +use crate::errors::InstanceServerError; +use crate::{api::types::InstancePolicies, instance::Instance}; - impl<'a> Instance<'a> { - /** - Gets the instance policies schema. - # Errors - [`InstanceServerError`] - If the request fails. - */ - pub async fn instance_policies_schema( - &self, - ) -> Result { - let client = Client::new(); - let endpoint_url = self.urls.get_api().to_string() + "/policies/instance/"; - let request = match client.get(&endpoint_url).send().await { - Ok(result) => result, - Err(e) => { - return Err(InstanceServerError::RequestErrorError { - url: endpoint_url, - error: e.to_string(), - }); - } - }; - - if !request.status().as_str().starts_with('2') { - return Err(InstanceServerError::ReceivedErrorCodeError { - error_code: request.status().to_string(), +impl<'a> Instance<'a> { + /** + Gets the instance policies schema. + # Errors + [`InstanceServerError`] - If the request fails. + */ + pub async fn instance_policies_schema(&self) -> Result { + let client = Client::new(); + let endpoint_url = self.urls.get_api().to_string() + "/policies/instance/"; + let request = match client.get(&endpoint_url).send().await { + Ok(result) => result, + Err(e) => { + return Err(InstanceServerError::RequestErrorError { + url: endpoint_url, + error: e.to_string(), }); } + }; - let body = request.text().await.unwrap(); - let instance_policies_schema: InstancePolicies = from_str(&body).unwrap(); - Ok(instance_policies_schema) + if !request.status().as_str().starts_with('2') { + return Err(InstanceServerError::ReceivedErrorCodeError { + error_code: request.status().to_string(), + }); } + + let body = request.text().await.unwrap(); + let instance_policies_schema: InstancePolicies = from_str(&body).unwrap(); + Ok(instance_policies_schema) } } @@ -58,4 +54,4 @@ mod instance_policies_schema_test { let schema = test_instance.instance_policies_schema().await.unwrap(); println!("{:?}", schema); } -} +} \ No newline at end of file From e67acfab49c001ec1692359704365e425da8ab0c Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:05:58 +0200 Subject: [PATCH 23/24] Was still a merge conflict there --- src/api/policies/instance/instance.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/api/policies/instance/instance.rs b/src/api/policies/instance/instance.rs index 4f86136..34af1c5 100644 --- a/src/api/policies/instance/instance.rs +++ b/src/api/policies/instance/instance.rs @@ -1,3 +1,4 @@ + use reqwest::Client; use serde_json::from_str; From fa05680552321e8e3bf3ecc8c955982d7f7debcd Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sun, 7 May 2023 12:44:11 +0200 Subject: [PATCH 24/24] Less spam debug log --- src/gateway.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 6f65f57..8220d43 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -53,7 +53,6 @@ impl<'a> Gateway<'a> { continue; } - println!("Debug GW: Received WSE: {}", msg.to_string()); let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes @@ -63,6 +62,8 @@ impl<'a> Gateway<'a> { 0 => { let gateway_payload_t = gateway_payload.t.unwrap(); + println!("GW: Received {}..", gateway_payload_t); + // See https://discord.com/developers/docs/topics/gateway-events#receive-events match gateway_payload_t.as_str() { "READY" => { @@ -172,11 +173,14 @@ impl<'a> Gateway<'a> { // Hello // Starts our heartbeat 10 => { + println!("GW: Received Hello"); let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone())); } // Heartbeat ACK - 11 => {} + 11 => { + println!("GW: Received Heartbeat ACK"); + } 2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} _ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)} } @@ -200,8 +204,6 @@ impl<'a> Gateway<'a> { let payload_json = serde_json::to_string(&gateway_payload).unwrap(); - println!("Debug GW: Sending WSE: {}", payload_json.clone()); - let message = tokio_tungstenite::tungstenite::Message::text(payload_json); self.websocket.tx.lock().await.send(message).await.unwrap(); @@ -212,6 +214,8 @@ impl<'a> Gateway<'a> { let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("GW: Sending Identify.."); + self.send_json_event(2, to_send_value).await; } @@ -220,6 +224,8 @@ impl<'a> Gateway<'a> { let to_send_value = serde_json::to_value(&to_send).unwrap(); + println!("GW: Sending Resume.."); + self.send_json_event(6, to_send_value).await; } @@ -259,7 +265,7 @@ impl HeartbeatHandler { if last_heartbeat.elapsed().as_millis() > heartbeat_interval { - println!("Debug GW: Sending Heartbeat"); + println!("GW: Sending Heartbeat.."); let heartbeat = GatewayHeartbeat { op: 1,