From 0192f1452d6c0e69e784b74b45d74cb8fdc84541 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 2 Jun 2023 10:42:19 +0200 Subject: [PATCH 01/15] Handle Heartbeat requests --- src/gateway.rs | 94 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 85c3515..ef9905c 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1144,7 +1144,29 @@ impl Gateway { } } // We received a heartbeat from the server - GATEWAY_HEARTBEAT => {} + // "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately." + GATEWAY_HEARTBEAT => { + println!("GW: Received Heartbeat // Heartbeat Request"); + + if self.heartbeat_handler.is_none() { + return; + } + + // Tell the heartbeat handler it should send a heartbeat right away + + let heartbeat_communication = HeartbeatThreadCommunication { + sequence_number: gateway_payload.sequence_number, + op_code: Some(GATEWAY_HEARTBEAT), + }; + + self.heartbeat_handler + .as_mut() + .unwrap() + .send + .send(heartbeat_communication) + .await + .unwrap(); + } GATEWAY_RECONNECT => { todo!() } @@ -1177,21 +1199,20 @@ impl Gateway { } // If we have an active heartbeat thread and we received a seq number we should let it know - if gateway_payload.sequence_number.is_some() { - if self.heartbeat_handler.is_some() { - let heartbeat_communication = HeartbeatThreadCommunication { - op_code: gateway_payload.op_code, - sequence_number: gateway_payload.sequence_number.unwrap(), - }; + if gateway_payload.sequence_number.is_some() && self.heartbeat_handler.is_some() { + let heartbeat_communication = HeartbeatThreadCommunication { + sequence_number: Some(gateway_payload.sequence_number.unwrap()), + // Op code is irrelevant here + op_code: None, + }; - self.heartbeat_handler - .as_mut() - .unwrap() - .send - .send(heartbeat_communication) - .await - .unwrap(); - } + self.heartbeat_handler + .as_mut() + .unwrap() + .send + .send(heartbeat_communication) + .await + .unwrap(); } } } @@ -1227,15 +1248,36 @@ impl HeartbeatHandler { let mut last_seq_number: Option = None; loop { - // If we received a seq number update, use that as the last seq number + let mut should_send; + + let time_to_send = + last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval; + + should_send = time_to_send; + let received_communication: Result = receive.try_recv(); if received_communication.is_ok() { - last_seq_number = Some(received_communication.unwrap().sequence_number); - } + let communication = received_communication.unwrap(); - let should_send = - last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval; + // If we received a seq number update, use that as the last seq number + if communication.sequence_number.is_some() { + last_seq_number = Some(communication.sequence_number.unwrap()); + } + + if communication.op_code.is_some() { + match communication.op_code.unwrap() { + GATEWAY_HEARTBEAT => { + // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately + should_send = true; + } + GATEWAY_HEARTBEAT_ACK => { + todo!() + } + _ => {} + } + } + } if should_send { println!("GW: Sending Heartbeat.."); @@ -1265,15 +1307,15 @@ impl HeartbeatHandler { } /** -Used to communicate with the main thread. -Either signifies a sequence number update or a received heartbeat ack +Used for communications between the heartbeat and gateway thread. +Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server */ #[derive(Clone, Copy, Debug)] struct HeartbeatThreadCommunication { - /// The opcode for the communication we received - op_code: u8, - /// The sequence number we got from discord - sequence_number: u64, + /// The opcode for the communication we received, if relevant + op_code: Option, + /// The sequence number we got from discord, if any + sequence_number: Option, } /** From 554810862df3b240e895cc6c7f92471513ce6991 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 2 Jun 2023 10:57:47 +0200 Subject: [PATCH 02/15] Handle Heartbeat ACKs --- src/gateway.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index ef9905c..0e5a10d 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -66,6 +66,9 @@ const GATEWAY_CALL_SYNC: u8 = 13; /// See [types::LazyRequest] const GATEWAY_LAZY_REQUEST: u8 = 14; +/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms +const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; + #[derive(Debug)] /** Represents a handle to a Gateway connection. A Gateway connection will create observable @@ -1180,6 +1183,25 @@ impl Gateway { } GATEWAY_HEARTBEAT_ACK => { println!("GW: Received Heartbeat ACK"); + + if self.heartbeat_handler.is_none() { + return; + } + + // Tell the heartbeat handler we received an ack + + let heartbeat_communication = HeartbeatThreadCommunication { + sequence_number: gateway_payload.sequence_number, + op_code: Some(GATEWAY_HEARTBEAT_ACK), + }; + + self.heartbeat_handler + .as_mut() + .unwrap() + .send + .send(heartbeat_communication) + .await + .unwrap(); } GATEWAY_IDENTIFY | GATEWAY_UPDATE_PRESENCE @@ -1245,6 +1267,7 @@ impl HeartbeatHandler { let handle: JoinHandle<()> = task::spawn(async move { let mut last_heartbeat_timestamp: Instant = time::Instant::now(); + let mut last_heartbeat_acknowledged = true; let mut last_seq_number: Option = None; loop { @@ -1272,13 +1295,20 @@ impl HeartbeatHandler { should_send = true; } GATEWAY_HEARTBEAT_ACK => { - todo!() + // The server received our heartbeat + last_heartbeat_acknowledged = true; } _ => {} } } } + // If the server hasn't acknowledged our heartbeat we should resend it + if !last_heartbeat_acknowledged && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT { + should_send = true; + println!("GW: Timed out waiting for a heartbeat ack, resending"); + } + if should_send { println!("GW: Sending Heartbeat.."); @@ -1294,6 +1324,7 @@ impl HeartbeatHandler { websocket_tx.lock().await.send(msg).await.unwrap(); last_heartbeat_timestamp = time::Instant::now(); + last_heartbeat_acknowledged = false; } } }); From a4b21072abbcaafeee11bcd45d725c70829057a3 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 2 Jun 2023 10:58:04 +0200 Subject: [PATCH 03/15] fmt --- src/gateway.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 0e5a10d..ae696f7 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1304,7 +1304,9 @@ impl HeartbeatHandler { } // If the server hasn't acknowledged our heartbeat we should resend it - if !last_heartbeat_acknowledged && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT { + if !last_heartbeat_acknowledged + && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT + { should_send = true; println!("GW: Timed out waiting for a heartbeat ack, resending"); } From 151ae4250acab4e2b20d9754ff8c95430d228cc3 Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Fri, 2 Jun 2023 11:00:34 +0200 Subject: [PATCH 04/15] Features is not always sent --- src/types/entities/guild.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/entities/guild.rs b/src/types/entities/guild.rs index 1b4d641..c177932 100644 --- a/src/types/entities/guild.rs +++ b/src/types/entities/guild.rs @@ -38,7 +38,7 @@ pub struct Guild { #[serde(default)] pub emojis: Vec, //#[cfg_attr(feature = "sqlx", sqlx(try_from = "String"))] - pub features: GuildFeaturesList, + pub features: Option, #[cfg_attr(feature = "sqlx", sqlx(skip))] pub application_id: Option, pub system_channel_id: Option, From 8f6c533e508509e3d0afcf69daa98172aa85d684 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Sun, 4 Jun 2023 13:49:35 +0200 Subject: [PATCH 05/15] Implement Clone for all events --- src/types/events/application.rs | 2 +- src/types/events/auto_moderation.rs | 8 +++--- src/types/events/call.rs | 8 +++--- src/types/events/channel.rs | 12 ++++---- src/types/events/guild.rs | 44 ++++++++++++++--------------- src/types/events/heartbeat.rs | 4 +-- src/types/events/hello.rs | 4 +-- src/types/events/identify.rs | 4 +-- src/types/events/integration.rs | 6 ++-- src/types/events/interaction.rs | 2 +- src/types/events/invite.rs | 4 +-- src/types/events/lazy_request.rs | 2 +- src/types/events/message.rs | 23 ++++++++------- src/types/events/passive_update.rs | 2 +- src/types/events/ready.rs | 12 ++++---- src/types/events/relationship.rs | 4 +-- src/types/events/request_members.rs | 2 +- src/types/events/resume.rs | 2 +- src/types/events/session.rs | 6 ++-- src/types/events/stage_instance.rs | 6 ++-- src/types/events/thread.rs | 12 ++++---- src/types/events/user.rs | 6 ++-- src/types/events/voice.rs | 6 ++-- src/types/events/webhooks.rs | 2 +- 24 files changed, 91 insertions(+), 92 deletions(-) diff --git a/src/types/events/application.rs b/src/types/events/application.rs index aca2b29..8afb374 100644 --- a/src/types/events/application.rs +++ b/src/types/events/application.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{GuildApplicationCommandPermissions, WebSocketEvent}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#application-command-permissions-update pub struct ApplicationCommandPermissionsUpdate { #[serde(flatten)] diff --git a/src/types/events/auto_moderation.rs b/src/types/events/auto_moderation.rs index 2fa8fc5..d82aa02 100644 --- a/src/types/events/auto_moderation.rs +++ b/src/types/events/auto_moderation.rs @@ -5,7 +5,7 @@ use crate::types::{ WebSocketEvent, }; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#auto-moderation-rule-create pub struct AutoModerationRuleCreate { #[serde(flatten)] @@ -14,7 +14,7 @@ pub struct AutoModerationRuleCreate { impl WebSocketEvent for AutoModerationRuleCreate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#auto-moderation-rule-update pub struct AutoModerationRuleUpdate { #[serde(flatten)] @@ -23,7 +23,7 @@ pub struct AutoModerationRuleUpdate { impl WebSocketEvent for AutoModerationRuleUpdate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#auto-moderation-rule-delete pub struct AutoModerationRuleDelete { #[serde(flatten)] @@ -32,7 +32,7 @@ pub struct AutoModerationRuleDelete { impl WebSocketEvent for AutoModerationRuleDelete {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#auto-moderation-action-execution pub struct AutoModerationActionExecution { pub guild_id: Snowflake, diff --git a/src/types/events/call.rs b/src/types/events/call.rs index 3c58635..7407ea6 100644 --- a/src/types/events/call.rs +++ b/src/types/events/call.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{VoiceState, WebSocketEvent}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// Is sent to a client by the server to signify a new being created /// {"t":"CALL_CREATE","s":2,"op":0,"d":{"voice_states":[],"ringing":[],"region":"milan","message_id":"1107187514906775613","embedded_activities":[],"channel_id":"837609115475771392"}} @@ -18,7 +18,7 @@ pub struct CallCreate { } impl WebSocketEvent for CallCreate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// Updates the status of calls /// {"t":"CALL_UPDATE","s":5,"op":0,"d":{"ringing":["837606544539254834"],"region":"milan","message_id":"1107191540234846308","guild_id":null,"channel_id":"837609115475771392"}} @@ -32,7 +32,7 @@ pub struct CallUpdate { } impl WebSocketEvent for CallUpdate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// Deletes a ringing call /// {"t":"CALL_DELETE","s":8,"op":0,"d":{"channel_id":"837609115475771392"}} @@ -41,7 +41,7 @@ pub struct CallDelete { } impl WebSocketEvent for CallDelete {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// See https://unofficial-discord-docs.vercel.app/gateway/op13 /// {"op":13,"d":{"channel_id":"837609115475771392"}} diff --git a/src/types/events/channel.rs b/src/types/events/channel.rs index adafa9f..9489027 100644 --- a/src/types/events/channel.rs +++ b/src/types/events/channel.rs @@ -3,7 +3,7 @@ use crate::types::events::WebSocketEvent; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#channel-pins-update pub struct ChannelPinsUpdate { pub guild_id: Option, @@ -13,7 +13,7 @@ pub struct ChannelPinsUpdate { impl WebSocketEvent for ChannelPinsUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#channel-create pub struct ChannelCreate { #[serde(flatten)] @@ -22,7 +22,7 @@ pub struct ChannelCreate { impl WebSocketEvent for ChannelCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#channel-update pub struct ChannelUpdate { #[serde(flatten)] @@ -31,7 +31,7 @@ pub struct ChannelUpdate { impl WebSocketEvent for ChannelUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// Officially undocumented. /// Sends updates to client about a new message with its id /// {"channel_unread_updates": [{"id": "816412869766938648", "last_message_id": "1085892012085104680"}} @@ -40,7 +40,7 @@ pub struct ChannelUnreadUpdate { pub guild_id: String, } -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// Contains very few fields from [Channel] /// See also [ChannelUnreadUpdates] pub struct ChannelUnreadUpdateObject { @@ -51,7 +51,7 @@ pub struct ChannelUnreadUpdateObject { impl WebSocketEvent for ChannelUnreadUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#channel-delete pub struct ChannelDelete { #[serde(flatten)] diff --git a/src/types/events/guild.rs b/src/types/events/guild.rs index d1c8fe4..34dbc01 100644 --- a/src/types/events/guild.rs +++ b/src/types/events/guild.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use super::PresenceUpdate; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-create /// This one is particularly painful, it can be a Guild object with an extra field or an unavailable guild object pub struct GuildCreate { @@ -14,7 +14,7 @@ pub struct GuildCreate { pub d: GuildCreateDataOption, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde(untagged)] pub enum GuildCreateDataOption { UnavailableGuild(UnavailableGuild), @@ -28,7 +28,7 @@ impl Default for GuildCreateDataOption { } impl WebSocketEvent for GuildCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-add-guild-ban-add-event-fields pub struct GuildBanAdd { pub guild_id: String, @@ -37,7 +37,7 @@ pub struct GuildBanAdd { impl WebSocketEvent for GuildBanAdd {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-remove pub struct GuildBanRemove { pub guild_id: String, @@ -46,7 +46,7 @@ pub struct GuildBanRemove { impl WebSocketEvent for GuildBanRemove {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-update pub struct GuildUpdate { #[serde(flatten)] @@ -55,7 +55,7 @@ pub struct GuildUpdate { impl WebSocketEvent for GuildUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-delete pub struct GuildDelete { #[serde(flatten)] @@ -64,7 +64,7 @@ pub struct GuildDelete { impl WebSocketEvent for GuildDelete {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-audit-log-entry-create pub struct GuildAuditLogEntryCreate { #[serde(flatten)] @@ -73,7 +73,7 @@ pub struct GuildAuditLogEntryCreate { impl WebSocketEvent for GuildAuditLogEntryCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-emojis-update pub struct GuildEmojisUpdate { pub guild_id: String, @@ -82,7 +82,7 @@ pub struct GuildEmojisUpdate { impl WebSocketEvent for GuildEmojisUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-stickers-update pub struct GuildStickersUpdate { pub guild_id: String, @@ -91,7 +91,7 @@ pub struct GuildStickersUpdate { impl WebSocketEvent for GuildStickersUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-integrations-update pub struct GuildIntegrationsUpdate { pub guild_id: String, @@ -99,7 +99,7 @@ pub struct GuildIntegrationsUpdate { impl WebSocketEvent for GuildIntegrationsUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-member-add pub struct GuildMemberAdd { #[serde(flatten)] @@ -109,7 +109,7 @@ pub struct GuildMemberAdd { impl WebSocketEvent for GuildMemberAdd {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-member-remove pub struct GuildMemberRemove { pub guild_id: String, @@ -118,7 +118,7 @@ pub struct GuildMemberRemove { impl WebSocketEvent for GuildMemberRemove {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-member-update pub struct GuildMemberUpdate { pub guild_id: String, @@ -136,7 +136,7 @@ pub struct GuildMemberUpdate { impl WebSocketEvent for GuildMemberUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-members-chunk pub struct GuildMembersChunk { pub guild_id: String, @@ -150,7 +150,7 @@ pub struct GuildMembersChunk { impl WebSocketEvent for GuildMembersChunk {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-role-create pub struct GuildRoleCreate { pub guild_id: String, @@ -159,7 +159,7 @@ pub struct GuildRoleCreate { impl WebSocketEvent for GuildRoleCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-role-update pub struct GuildRoleUpdate { pub guild_id: String, @@ -168,7 +168,7 @@ pub struct GuildRoleUpdate { impl WebSocketEvent for GuildRoleUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-role-delete pub struct GuildRoleDelete { pub guild_id: String, @@ -177,7 +177,7 @@ pub struct GuildRoleDelete { impl WebSocketEvent for GuildRoleDelete {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-scheduled-event-create pub struct GuildScheduledEventCreate { #[serde(flatten)] @@ -186,7 +186,7 @@ pub struct GuildScheduledEventCreate { impl WebSocketEvent for GuildScheduledEventCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-scheduled-event-update pub struct GuildScheduledEventUpdate { #[serde(flatten)] @@ -195,7 +195,7 @@ pub struct GuildScheduledEventUpdate { impl WebSocketEvent for GuildScheduledEventUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-scheduled-event-delete pub struct GuildScheduledEventDelete { #[serde(flatten)] @@ -204,7 +204,7 @@ pub struct GuildScheduledEventDelete { impl WebSocketEvent for GuildScheduledEventDelete {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-scheduled-event-user-add pub struct GuildScheduledEventUserAdd { pub guild_scheduled_event_id: String, @@ -214,7 +214,7 @@ pub struct GuildScheduledEventUserAdd { impl WebSocketEvent for GuildScheduledEventUserAdd {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#guild-scheduled-event-user-remove pub struct GuildScheduledEventUserRemove { pub guild_scheduled_event_id: String, diff --git a/src/types/events/heartbeat.rs b/src/types/events/heartbeat.rs index be9e3f8..d824db9 100644 --- a/src/types/events/heartbeat.rs +++ b/src/types/events/heartbeat.rs @@ -1,7 +1,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] pub struct GatewayHeartbeat { pub op: u8, pub d: Option, @@ -9,7 +9,7 @@ pub struct GatewayHeartbeat { impl WebSocketEvent for GatewayHeartbeat {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] pub struct GatewayHeartbeatAck { pub op: i32, } diff --git a/src/types/events/hello.rs b/src/types/events/hello.rs index 214f211..85ef1f2 100644 --- a/src/types/events/hello.rs +++ b/src/types/events/hello.rs @@ -1,7 +1,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] pub struct GatewayHello { pub op: i32, pub d: HelloData, @@ -9,7 +9,7 @@ pub struct GatewayHello { impl WebSocketEvent for GatewayHello {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] pub struct HelloData { pub heartbeat_interval: u128, } diff --git a/src/types/events/identify.rs b/src/types/events/identify.rs index c4b55f4..0b33b89 100644 --- a/src/types/events/identify.rs +++ b/src/types/events/identify.rs @@ -2,7 +2,7 @@ use crate::types::events::{PresenceUpdate, WebSocketEvent}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] pub struct GatewayIdentifyPayload { pub token: String, pub properties: GatewayIdentifyConnectionProps, @@ -65,7 +65,7 @@ impl GatewayIdentifyPayload { impl WebSocketEvent for GatewayIdentifyPayload {} -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Clone)] #[serde_as] pub struct GatewayIdentifyConnectionProps { /// Almost always sent diff --git a/src/types/events/integration.rs b/src/types/events/integration.rs index 6569a72..9a38e83 100644 --- a/src/types/events/integration.rs +++ b/src/types/events/integration.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{Integration, WebSocketEvent}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#integration-create pub struct IntegrationCreate { #[serde(flatten)] @@ -12,7 +12,7 @@ pub struct IntegrationCreate { impl WebSocketEvent for IntegrationCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#integration-update pub struct IntegrationUpdate { #[serde(flatten)] @@ -22,7 +22,7 @@ pub struct IntegrationUpdate { impl WebSocketEvent for IntegrationUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#integration-delete pub struct IntegrationDelete { pub id: String, diff --git a/src/types/events/interaction.rs b/src/types/events/interaction.rs index 51513a3..e77ee7c 100644 --- a/src/types/events/interaction.rs +++ b/src/types/events/interaction.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{Interaction, WebSocketEvent}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#interaction-create pub struct InteractionCreate { #[serde(flatten)] diff --git a/src/types/events/invite.rs b/src/types/events/invite.rs index 4eb31a7..fc6c80d 100644 --- a/src/types/events/invite.rs +++ b/src/types/events/invite.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{GuildInvite, WebSocketEvent}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#invite-create pub struct InviteCreate { #[serde(flatten)] @@ -11,7 +11,7 @@ pub struct InviteCreate { impl WebSocketEvent for InviteCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#invite-delete pub struct InviteDelete { pub channel_id: String, diff --git a/src/types/events/lazy_request.rs b/src/types/events/lazy_request.rs index 2bf3038..9e54d77 100644 --- a/src/types/events/lazy_request.rs +++ b/src/types/events/lazy_request.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use super::WebSocketEvent; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// /// Sent to the server to signify lazy loading of a guild; diff --git a/src/types/events/message.rs b/src/types/events/message.rs index 3b8d8b1..f4ca606 100644 --- a/src/types/events/message.rs +++ b/src/types/events/message.rs @@ -1,11 +1,10 @@ -use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use crate::types::entities::{Emoji, GuildMember, Message, PublicUser}; use super::WebSocketEvent; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct TypingStartEvent { pub channel_id: String, pub guild_id: Option, @@ -16,7 +15,7 @@ pub struct TypingStartEvent { impl WebSocketEvent for TypingStartEvent {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#message-create pub struct MessageCreate { #[serde(flatten)] @@ -26,7 +25,7 @@ pub struct MessageCreate { mentions: Option>, } -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#message-create-message-create-extra-fields pub struct MessageCreateUser { #[serde(flatten)] @@ -36,7 +35,7 @@ pub struct MessageCreateUser { impl WebSocketEvent for MessageCreate {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageUpdate { #[serde(flatten)] message: Message, @@ -47,7 +46,7 @@ pub struct MessageUpdate { impl WebSocketEvent for MessageUpdate {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageDelete { id: String, channel_id: String, @@ -56,7 +55,7 @@ pub struct MessageDelete { impl WebSocketEvent for MessageDelete {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageDeleteBulk { ids: Vec, channel_id: String, @@ -65,7 +64,7 @@ pub struct MessageDeleteBulk { impl WebSocketEvent for MessageDeleteBulk {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageReactionAdd { user_id: String, channel_id: String, @@ -77,7 +76,7 @@ pub struct MessageReactionAdd { impl WebSocketEvent for MessageReactionAdd {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageReactionRemove { user_id: String, channel_id: String, @@ -88,7 +87,7 @@ pub struct MessageReactionRemove { impl WebSocketEvent for MessageReactionRemove {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageReactionRemoveAll { channel_id: String, message_id: String, @@ -97,7 +96,7 @@ pub struct MessageReactionRemoveAll { impl WebSocketEvent for MessageReactionRemoveAll {} -#[derive(Debug, Serialize, Deserialize, Default)] +#[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct MessageReactionRemoveEmoji { channel_id: String, message_id: String, @@ -107,7 +106,7 @@ pub struct MessageReactionRemoveEmoji { impl WebSocketEvent for MessageReactionRemoveEmoji {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// /// Not documented anywhere unofficially diff --git a/src/types/events/passive_update.rs b/src/types/events/passive_update.rs index 7417467..0d10354 100644 --- a/src/types/events/passive_update.rs +++ b/src/types/events/passive_update.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use super::{ChannelUnreadUpdateObject, WebSocketEvent}; use crate::types::{GuildMember, VoiceState}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// /// Seems to be passively set to update the client on guild details (though, why not just send the update events?) diff --git a/src/types/events/ready.rs b/src/types/events/ready.rs index c559f0b..e637b82 100644 --- a/src/types/events/ready.rs +++ b/src/types/events/ready.rs @@ -4,7 +4,7 @@ use crate::types::interfaces::ClientStatusObject; use crate::types::{Activity, GuildMember, PresenceUpdate, VoiceState}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Sort of documented, though most fields are left out /// For a full example see https://gist.github.com/kozabrada123/a347002b1fb8825a5727e40746d4e199 /// to:do add all undocumented fields @@ -27,7 +27,7 @@ pub struct GatewayReady { impl WebSocketEvent for GatewayReady {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// Sent after the READY event when a client is a user /// {"t":"READY_SUPPLEMENTAL","s":2,"op":0,"d":{"merged_presences":{"guilds":[[{"user_id":"463640391196082177","status":"online","game":null,"client_status":{"web":"online"},"activities":[]}]],"friends":[{"user_id":"463640391196082177","status":"online","last_modified":1684053508443,"client_status":{"web":"online"},"activities":[]}]},"merged_members":[[{"user_id":"463640391196082177","roles":[],"premium_since":null,"pending":false,"nick":"pog","mute":false,"joined_at":"2021-05-30T15:24:08.763000+00:00","flags":0,"deaf":false,"communication_disabled_until":null,"avatar":null}]],"lazy_private_channels":[],"guilds":[{"voice_states":[],"id":"848582562217590824","embedded_activities":[]}],"disclose":["pomelo"]}} @@ -43,13 +43,13 @@ pub struct GatewayReadySupplemental { impl WebSocketEvent for GatewayReadySupplemental {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct MergedPresences { pub guilds: Vec>, pub friends: Vec, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct MergedPresenceFriend { pub user_id: String, pub status: String, @@ -59,7 +59,7 @@ pub struct MergedPresenceFriend { pub activities: Vec, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct MergedPresenceGuild { pub user_id: String, pub status: String, @@ -69,7 +69,7 @@ pub struct MergedPresenceGuild { pub activities: Vec, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct SupplementalGuild { pub voice_states: Option>, pub id: String, diff --git a/src/types/events/relationship.rs b/src/types/events/relationship.rs index c9b79c0..6bcb84d 100644 --- a/src/types/events/relationship.rs +++ b/src/types/events/relationship.rs @@ -1,7 +1,7 @@ use crate::types::{events::WebSocketEvent, Relationship, RelationshipType, Snowflake}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://github.com/spacebarchat/server/issues/204 pub struct RelationshipAdd { #[serde(flatten)] @@ -11,7 +11,7 @@ pub struct RelationshipAdd { impl WebSocketEvent for RelationshipAdd {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://github.com/spacebarchat/server/issues/203 pub struct RelationshipRemove { pub id: Snowflake, diff --git a/src/types/events/request_members.rs b/src/types/events/request_members.rs index 67f37ce..c02cb8d 100644 --- a/src/types/events/request_members.rs +++ b/src/types/events/request_members.rs @@ -1,7 +1,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#request-guild-members-request-guild-members-structure pub struct GatewayRequestGuildMembers { pub guild_id: String, diff --git a/src/types/events/resume.rs b/src/types/events/resume.rs index 362de98..67ee7d2 100644 --- a/src/types/events/resume.rs +++ b/src/types/events/resume.rs @@ -1,7 +1,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct GatewayResume { pub token: String, pub session_id: String, diff --git a/src/types/events/session.rs b/src/types/events/session.rs index 26364d9..574c5fa 100644 --- a/src/types/events/session.rs +++ b/src/types/events/session.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{Activity, WebSocketEvent}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Officially Undocumented /// Seems like it sends active session info to users on connect /// [{"activities":[],"client_info":{"client":"web","os":"other","version":0},"session_id":"ab5941b50d818b1f8d93b4b1b581b192","status":"online"}] @@ -10,7 +10,7 @@ pub struct SessionsReplace { pub sessions: Vec, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Session info for the current user pub struct Session { pub activities: Vec, @@ -19,7 +19,7 @@ pub struct Session { pub status: String, } -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// Another Client info object /// {"client":"web","os":"other","version":0} // Note: I don't think this one exists yet? Though I might've made a mistake and this might be a duplicate diff --git a/src/types/events/stage_instance.rs b/src/types/events/stage_instance.rs index ea215f9..0fe487b 100644 --- a/src/types/events/stage_instance.rs +++ b/src/types/events/stage_instance.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{StageInstance, WebSocketEvent}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#stage-instance-create pub struct StageInstanceCreate { #[serde(flatten)] @@ -11,7 +11,7 @@ pub struct StageInstanceCreate { impl WebSocketEvent for StageInstanceCreate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#stage-instance-update pub struct StageInstanceUpdate { #[serde(flatten)] @@ -20,7 +20,7 @@ pub struct StageInstanceUpdate { impl WebSocketEvent for StageInstanceUpdate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#stage-instance-delete pub struct StageInstanceDelete { #[serde(flatten)] diff --git a/src/types/events/thread.rs b/src/types/events/thread.rs index 0cc5f91..e262331 100644 --- a/src/types/events/thread.rs +++ b/src/types/events/thread.rs @@ -2,7 +2,7 @@ use crate::types::entities::{Channel, ThreadMember}; use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-create pub struct ThreadCreate { #[serde(flatten)] @@ -11,7 +11,7 @@ pub struct ThreadCreate { impl WebSocketEvent for ThreadCreate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-update pub struct ThreadUpdate { #[serde(flatten)] @@ -20,7 +20,7 @@ pub struct ThreadUpdate { impl WebSocketEvent for ThreadUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-delete pub struct ThreadDelete { #[serde(flatten)] @@ -29,7 +29,7 @@ pub struct ThreadDelete { impl WebSocketEvent for ThreadDelete {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-list-sync pub struct ThreadListSync { pub guild_id: String, @@ -40,7 +40,7 @@ pub struct ThreadListSync { impl WebSocketEvent for ThreadListSync {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-member-update /// The inner payload is a thread member object with an extra field. pub struct ThreadMemberUpdate { @@ -51,7 +51,7 @@ pub struct ThreadMemberUpdate { impl WebSocketEvent for ThreadMemberUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#thread-members-update pub struct ThreadMembersUpdate { pub id: String, diff --git a/src/types/events/user.rs b/src/types/events/user.rs index cfdae5c..01c1a4b 100644 --- a/src/types/events/user.rs +++ b/src/types/events/user.rs @@ -3,7 +3,7 @@ use crate::types::events::WebSocketEvent; use crate::types::utils::Snowflake; use serde::{Deserialize, Serialize}; -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#user-update pub struct UserUpdate { #[serde(flatten)] @@ -12,7 +12,7 @@ pub struct UserUpdate { impl WebSocketEvent for UserUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// Undocumented /// /// Possibly an update for muted guild / channel settings for the current user @@ -37,7 +37,7 @@ pub struct UserGuildSettingsUpdate { impl WebSocketEvent for UserGuildSettingsUpdate {} -#[derive(Debug, Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Clone)] /// Undocumented /// /// Received in [UserGuildSettingsUpdate] diff --git a/src/types/events/voice.rs b/src/types/events/voice.rs index 8af9c66..c7b4f62 100644 --- a/src/types/events/voice.rs +++ b/src/types/events/voice.rs @@ -1,7 +1,7 @@ use crate::types::{events::WebSocketEvent, VoiceState}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#update-voice-state /// /// Sent to the server @@ -16,7 +16,7 @@ pub struct UpdateVoiceState { impl WebSocketEvent for UpdateVoiceState {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#voice-state-update /// /// Received from the server @@ -29,7 +29,7 @@ pub struct VoiceStateUpdate { impl WebSocketEvent for VoiceStateUpdate {} -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#voice-server-update pub struct VoiceServerUpdate { pub token: String, diff --git a/src/types/events/webhooks.rs b/src/types/events/webhooks.rs index b88e1ee..3a93c60 100644 --- a/src/types/events/webhooks.rs +++ b/src/types/events/webhooks.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use super::WebSocketEvent; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Deserialize, Serialize, Default, Clone)] /// See https://discord.com/developers/docs/topics/gateway-events#webhooks-update pub struct WebhooksUpdate { pub guild_id: String, From a0629bf19814100e953f58143af388ca78315c61 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Thu, 8 Jun 2023 17:34:52 +0200 Subject: [PATCH 06/15] Gateway basic error handling --- src/errors.rs | 32 ++++++++ src/gateway.rs | 216 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 219 insertions(+), 29 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 749c8f7..a2843aa 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -31,3 +31,35 @@ custom_error! { pub ObserverError AlreadySubscribedError = "Each event can only be subscribed to once." } + +custom_error! { + /// For errors we receive from the gateway, see https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#gateway-close-event-codes; + /// + /// Supposed to be sent as numbers, though they are sent as string most of the time? + /// + /// Also includes errors when initiating a connection and unexpected opcodes + #[derive(PartialEq, Eq)] + pub GatewayError + // Errors we have received from the gateway + UnknownError = "We're not sure what went wrong. Try reconnecting?", + UnknownOpcodeError = "You sent an invalid Gateway opcode or an invalid payload for an opcode", + DecodeError = "Gateway server couldn't decode payload", + NotAuthenticatedError = "You sent a payload prior to identifying", + AuthenticationFailedError = "The account token sent with your identify payload is invalid", + AlreadyAuthenticatedError = "You've already identified, no need to reauthenticate", + InvalidSequenceNumberError = "The sequence number sent when resuming the session was invalid. Reconnect and start a new session", + RateLimitedError = "You are being rate limited!", + SessionTimedOutError = "Your session timed out. Reconnect and start a new one", + InvalidShardError = "You sent us an invalid shard when identifying", + ShardingRequiredError = "The session would have handled too many guilds - you are required to shard your connection in order to connect", + InvalidAPIVersionError = "You sent an invalid Gateway version", + InvalidIntentsError = "You sent an invalid intent", + DisallowedIntentsError = "You sent a disallowed intent. You may have tried to specify an intent that you have not enabled or are not approved for", + + // Errors when initiating a gateway connection + CannotConnectError{error: String} = "Cannot connect due to a tungstenite error: {error}", + NonHelloOnInitiateError{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong", + + // Other misc errors + UnexpectedOpcodeReceivedError{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}", +} diff --git a/src/gateway.rs b/src/gateway.rs index ae696f7..607fabf 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,3 +1,4 @@ +use crate::errors::GatewayError; use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::types; @@ -69,6 +70,106 @@ const GATEWAY_LAZY_REQUEST: u8 = 14; /// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; +#[derive(Clone, Debug)] +/** +Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError]. +This struct is used internally when handling messages. +*/ +pub struct GatewayMessage { + /// The message we received from the server + message: tokio_tungstenite::tungstenite::Message, +} + +impl GatewayMessage { + /// Creates self from a tungstenite message + pub fn from_tungstenite_message(message: tokio_tungstenite::tungstenite::Message) -> Self { + Self { message } + } + + /// Parses the message as an error; + /// Returns the error if succesfully parsed, None if the message isn't an error + pub fn error(&self) -> Option { + let content = self.message.to_string(); + + // Some error strings have dots on the end, which we don't care about + let processed_content = content.clone().to_lowercase().replace(".", ""); + + match processed_content.as_str() { + "unknown error" | "4000" => { + return Some(GatewayError::UnknownError); + } + "unknown opcode" | "4001" => { + return Some(GatewayError::UnknownOpcodeError); + } + "decode error" | "4002" => { + return Some(GatewayError::DecodeError); + } + "not authenticated" | "4003" => { + return Some(GatewayError::NotAuthenticatedError); + } + "authentication failed" | "4004" => { + return Some(GatewayError::AuthenticationFailedError); + } + "already authenticated" | "4005" => { + return Some(GatewayError::AlreadyAuthenticatedError); + } + "invalid seq" | "4007" => { + return Some(GatewayError::InvalidSequenceNumberError); + } + "rate limited" | "4008" => { + return Some(GatewayError::RateLimitedError); + } + "session timed out" | "4009" => { + return Some(GatewayError::SessionTimedOutError); + } + "invalid shard" | "4010" => { + return Some(GatewayError::InvalidShardError); + } + "sharding required" | "4011" => { + return Some(GatewayError::ShardingRequiredError); + } + "invalid api version" | "4012" => { + return Some(GatewayError::InvalidAPIVersionError); + } + "invalid intent(s)" | "invalid intent" | "4013" => { + return Some(GatewayError::InvalidIntentsError); + } + "disallowed intent(s)" | "disallowed intents" | "4014" => { + return Some(GatewayError::DisallowedIntentsError); + } + _ => { + return None; + } + } + } + + /// Returns whether or not the message is an error + pub fn is_error(&self) -> bool { + return self.error().is_some(); + } + + /// Parses the message as a payload; + /// Returns a result of deserializing + pub fn payload(&self) -> Result { + return serde_json::from_str(self.message.to_text().unwrap()); + } + + /// Returns whether or not the message is a payload + pub fn is_payload(&self) -> bool { + // close messages are never payloads, payloads are only text messages + if self.message.is_close() | !self.message.is_text() { + return false; + } + + return self.payload().is_ok(); + } + + /// Returns whether or not the message is empty + pub fn is_empty(&self) -> bool { + return self.message.is_empty(); + } +} + #[derive(Debug)] /** Represents a handle to a Gateway connection. A Gateway connection will create observable @@ -88,6 +189,8 @@ pub struct GatewayHandle { >, >, pub handle: JoinHandle<()>, + /// Tells gateway tasks to close + kill_send: tokio::sync::broadcast::Sender<()>, } impl GatewayHandle { @@ -177,6 +280,12 @@ impl GatewayHandle { self.send_json_event(GATEWAY_LAZY_REQUEST, to_send_value) .await; } + + /// Closes the websocket connection and stops all gateway tasks + async fn close(&mut self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } } pub struct Gateway { @@ -190,12 +299,11 @@ pub struct Gateway { >, >, >, + kill_send: tokio::sync::broadcast::Sender<()>, } impl Gateway { - pub async fn new( - websocket_url: String, - ) -> Result { + pub async fn new(websocket_url: String) -> Result { let (websocket_stream, _) = match connect_async_tls_with_config( &websocket_url, None, @@ -207,34 +315,39 @@ impl Gateway { .await { Ok(websocket_stream) => websocket_stream, - Err(e) => return Err(e), + Err(e) => { + return Err(GatewayError::CannotConnectError { + error: e.to_string(), + }) + } }; - let (gateway_send, mut gateway_receive) = websocket_stream.split(); + let (websocket_send, mut websocket_receive) = websocket_stream.split(); - let shared_gateway_send = Arc::new(Mutex::new(gateway_send)); + let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); + + // Create a shared broadcast channel for killing all gateway tasks + let (kill_send, mut kill_receive) = tokio::sync::broadcast::channel::<()>(16); let mut gateway = Gateway { events: Arc::new(Mutex::new(Events::default())), heartbeat_handler: None, - websocket_send: shared_gateway_send.clone(), + websocket_send: shared_websocket_send.clone(), + kill_send: kill_send.clone(), }; let shared_events = gateway.events.clone(); // Wait for the first hello and then spawn both tasks so we avoid nested tasks // This automatically spawns the heartbeat task, but from the main thread - let msg = gateway_receive.next().await.unwrap().unwrap(); + let msg = websocket_receive.next().await.unwrap().unwrap(); let gateway_payload: types::GatewayReceivePayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); if gateway_payload.op_code != GATEWAY_HELLO { - println!("Received non hello on gateway init, what is happening?"); - return Err(tokio_tungstenite::tungstenite::Error::Protocol( - tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode( - gateway_payload.op_code, - ), - )); + return Err(GatewayError::NonHelloOnInitiateError { + opcode: gateway_payload.op_code, + }); } println!("GW: Received Hello"); @@ -243,36 +356,69 @@ impl Gateway { serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap(); gateway.heartbeat_handler = Some(HeartbeatHandler::new( gateway_hello.heartbeat_interval, - shared_gateway_send.clone(), + shared_websocket_send.clone(), + kill_send.subscribe(), )); // Now we can continuously check for messages in a different task, since we aren't going to receive another hello let handle: JoinHandle<()> = task::spawn(async move { loop { - let msg = gateway_receive.next().await; + let msg = websocket_receive.next().await; + + // This if chain can be much better but if let is unstable on stable rust if msg.as_ref().is_some() { - let msg_unwrapped = msg.unwrap().unwrap(); - gateway.handle_event(msg_unwrapped).await; - }; + if msg.as_ref().unwrap().is_ok() { + let msg_unwrapped = msg.unwrap().unwrap(); + gateway + .handle_event(GatewayMessage::from_tungstenite_message(msg_unwrapped)) + .await; + + continue; + } + } + + // We couldn't receive the next message or it was an error, something is wrong with the websocket, close + println!("GW: Websocket is broken, stopping gateway"); + break; } }); return Ok(GatewayHandle { url: websocket_url.clone(), events: shared_events, - websocket_send: shared_gateway_send.clone(), + websocket_send: shared_websocket_send.clone(), handle, + kill_send: kill_send.clone(), }); } + /// Closes the websocket connection and stops all tasks + async fn close(&mut self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } + /// This handles a message as a websocket event and updates its events along with the events' observers - pub async fn handle_event(&mut self, msg: tokio_tungstenite::tungstenite::Message) { - if msg.to_string() == String::new() { + pub async fn handle_event(&mut self, msg: GatewayMessage) { + if msg.is_empty() { return; } - let gateway_payload: types::GatewayReceivePayload = - serde_json::from_str(msg.to_text().unwrap()).unwrap(); + // To:do: handle errors in a good way, maybe observers like events? + if msg.is_error() { + println!("GW: Received error, connection will close.."); + + let error = msg.error(); + + match error { + _ => {} + } + + self.close().await; + return; + } + + let gateway_payload = msg.payload().unwrap(); // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes match gateway_payload.op_code { @@ -1210,10 +1356,10 @@ impl Gateway { | GATEWAY_REQUEST_GUILD_MEMBERS | GATEWAY_CALL_SYNC | GATEWAY_LAZY_REQUEST => { - panic!( - "Received gateway op code that's meant to be sent, not received ({})", - gateway_payload.op_code - ) + let error = GatewayError::UnexpectedOpcodeReceivedError { + opcode: gateway_payload.op_code, + }; + Err::<(), GatewayError>(error).unwrap(); } _ => { println!("Received unrecognized gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code); @@ -1262,8 +1408,10 @@ impl HeartbeatHandler { >, >, >, + kill_rc: tokio::sync::broadcast::Receiver<()>, ) -> HeartbeatHandler { let (send, mut receive) = mpsc::channel(32); + let mut kill_receive = kill_rc.resubscribe(); let handle: JoinHandle<()> = task::spawn(async move { let mut last_heartbeat_timestamp: Instant = time::Instant::now(); @@ -1271,6 +1419,11 @@ impl HeartbeatHandler { let mut last_seq_number: Option = None; loop { + let should_shutdown = kill_receive.try_recv().is_ok(); + if should_shutdown { + break; + } + let mut should_send; let time_to_send = @@ -1323,7 +1476,12 @@ impl HeartbeatHandler { let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); - websocket_tx.lock().await.send(msg).await.unwrap(); + let send_result = websocket_tx.lock().await.send(msg).await; + if send_result.is_err() { + // We couldn't send, the websocket is broken + println!("GW: Couldnt send heartbeat, websocket seems broken"); + break; + } last_heartbeat_timestamp = time::Instant::now(); last_heartbeat_acknowledged = false; From da98f3c5818d864a48615425b66770788f288ac7 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Thu, 8 Jun 2023 18:24:11 +0200 Subject: [PATCH 07/15] Refactor --- src/gateway.rs | 274 ++++++++++++++++++++++++++----------------------- 1 file changed, 146 insertions(+), 128 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 607fabf..d826bea 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -3,12 +3,12 @@ use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::types; use futures_util::stream::SplitSink; +use futures_util::stream::SplitStream; use futures_util::SinkExt; use futures_util::StreamExt; use native_tls::TlsConnector; use std::sync::Arc; use tokio::net::TcpStream; -use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::Sender; use tokio::sync::Mutex; @@ -282,7 +282,7 @@ impl GatewayHandle { } /// Closes the websocket connection and stops all gateway tasks - async fn close(&mut self) { + pub async fn close(&mut self) { self.kill_send.send(()).unwrap(); self.websocket_send.lock().await.close().await.unwrap(); } @@ -290,7 +290,7 @@ impl GatewayHandle { pub struct Gateway { pub events: Arc>, - heartbeat_handler: Option, + heartbeat_handler: HeartbeatHandler, pub websocket_send: Arc< Mutex< SplitSink< @@ -299,6 +299,7 @@ pub struct Gateway { >, >, >, + pub websocket_receive: SplitStream>>, kill_send: tokio::sync::broadcast::Sender<()>, } @@ -327,16 +328,7 @@ impl Gateway { let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); // Create a shared broadcast channel for killing all gateway tasks - let (kill_send, mut kill_receive) = tokio::sync::broadcast::channel::<()>(16); - - let mut gateway = Gateway { - events: Arc::new(Mutex::new(Events::default())), - heartbeat_handler: None, - websocket_send: shared_websocket_send.clone(), - kill_send: kill_send.clone(), - }; - - let shared_events = gateway.events.clone(); + let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16); // Wait for the first hello and then spawn both tasks so we avoid nested tasks // This automatically spawns the heartbeat task, but from the main thread @@ -354,33 +346,24 @@ impl Gateway { let gateway_hello: types::HelloData = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap(); - gateway.heartbeat_handler = Some(HeartbeatHandler::new( - gateway_hello.heartbeat_interval, - shared_websocket_send.clone(), - kill_send.subscribe(), - )); + + let mut gateway = Gateway { + events: Arc::new(Mutex::new(Events::default())), + heartbeat_handler: HeartbeatHandler::new( + gateway_hello.heartbeat_interval, + shared_websocket_send.clone(), + kill_send.subscribe(), + ), + websocket_send: shared_websocket_send.clone(), + websocket_receive, + kill_send: kill_send.clone(), + }; + + let shared_events = gateway.events.clone(); // Now we can continuously check for messages in a different task, since we aren't going to receive another hello let handle: JoinHandle<()> = task::spawn(async move { - loop { - let msg = websocket_receive.next().await; - - // This if chain can be much better but if let is unstable on stable rust - if msg.as_ref().is_some() { - if msg.as_ref().unwrap().is_ok() { - let msg_unwrapped = msg.unwrap().unwrap(); - gateway - .handle_event(GatewayMessage::from_tungstenite_message(msg_unwrapped)) - .await; - - continue; - } - } - - // We couldn't receive the next message or it was an error, something is wrong with the websocket, close - println!("GW: Websocket is broken, stopping gateway"); - break; - } + gateway.gateway_listen_task().await; }); return Ok(GatewayHandle { @@ -392,6 +375,30 @@ impl Gateway { }); } + /// The main gateway listener task; + /// + /// Can only be stopped by closing the websocket, cannot be made to listen for kill + pub async fn gateway_listen_task(&mut self) { + loop { + let msg = self.websocket_receive.next().await; + + // This if chain can be much better but if let is unstable on stable rust + if msg.as_ref().is_some() { + if msg.as_ref().unwrap().is_ok() { + let msg_unwrapped = msg.unwrap().unwrap(); + self.handle_event(GatewayMessage::from_tungstenite_message(msg_unwrapped)) + .await; + + continue; + } + } + + // We couldn't receive the next message or it was an error, something is wrong with the websocket, close + println!("GW: Websocket is broken, stopping gateway"); + break; + } + } + /// Closes the websocket connection and stops all tasks async fn close(&mut self) { self.kill_send.send(()).unwrap(); @@ -1297,10 +1304,6 @@ impl Gateway { GATEWAY_HEARTBEAT => { println!("GW: Received Heartbeat // Heartbeat Request"); - if self.heartbeat_handler.is_none() { - return; - } - // Tell the heartbeat handler it should send a heartbeat right away let heartbeat_communication = HeartbeatThreadCommunication { @@ -1309,8 +1312,6 @@ impl Gateway { }; self.heartbeat_handler - .as_mut() - .unwrap() .send .send(heartbeat_communication) .await @@ -1330,10 +1331,6 @@ impl Gateway { GATEWAY_HEARTBEAT_ACK => { println!("GW: Received Heartbeat ACK"); - if self.heartbeat_handler.is_none() { - return; - } - // Tell the heartbeat handler we received an ack let heartbeat_communication = HeartbeatThreadCommunication { @@ -1342,8 +1339,6 @@ impl Gateway { }; self.heartbeat_handler - .as_mut() - .unwrap() .send .send(heartbeat_communication) .await @@ -1366,8 +1361,8 @@ impl Gateway { } } - // If we have an active heartbeat thread and we received a seq number we should let it know - if gateway_payload.sequence_number.is_some() && self.heartbeat_handler.is_some() { + // If we we received a seq number we should let it know + if gateway_payload.sequence_number.is_some() { let heartbeat_communication = HeartbeatThreadCommunication { sequence_number: Some(gateway_payload.sequence_number.unwrap()), // Op code is irrelevant here @@ -1375,8 +1370,6 @@ impl Gateway { }; self.heartbeat_handler - .as_mut() - .unwrap() .send .send(heartbeat_communication) .await @@ -1410,83 +1403,17 @@ impl HeartbeatHandler { >, kill_rc: tokio::sync::broadcast::Receiver<()>, ) -> HeartbeatHandler { - let (send, mut receive) = mpsc::channel(32); - let mut kill_receive = kill_rc.resubscribe(); + let (send, receive) = tokio::sync::mpsc::channel(32); + let kill_receive = kill_rc.resubscribe(); let handle: JoinHandle<()> = task::spawn(async move { - let mut last_heartbeat_timestamp: Instant = time::Instant::now(); - let mut last_heartbeat_acknowledged = true; - let mut last_seq_number: Option = None; - - loop { - let should_shutdown = kill_receive.try_recv().is_ok(); - if should_shutdown { - break; - } - - let mut should_send; - - let time_to_send = - last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval; - - should_send = time_to_send; - - let received_communication: Result = - receive.try_recv(); - if received_communication.is_ok() { - let communication = received_communication.unwrap(); - - // If we received a seq number update, use that as the last seq number - if communication.sequence_number.is_some() { - last_seq_number = Some(communication.sequence_number.unwrap()); - } - - if communication.op_code.is_some() { - match communication.op_code.unwrap() { - GATEWAY_HEARTBEAT => { - // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately - should_send = true; - } - GATEWAY_HEARTBEAT_ACK => { - // The server received our heartbeat - last_heartbeat_acknowledged = true; - } - _ => {} - } - } - } - - // If the server hasn't acknowledged our heartbeat we should resend it - if !last_heartbeat_acknowledged - && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT - { - should_send = true; - println!("GW: Timed out waiting for a heartbeat ack, resending"); - } - - if should_send { - println!("GW: Sending Heartbeat.."); - - let heartbeat = types::GatewayHeartbeat { - op: GATEWAY_HEARTBEAT, - d: last_seq_number, - }; - - let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); - - let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); - - let send_result = websocket_tx.lock().await.send(msg).await; - if send_result.is_err() { - // We couldn't send, the websocket is broken - println!("GW: Couldnt send heartbeat, websocket seems broken"); - break; - } - - last_heartbeat_timestamp = time::Instant::now(); - last_heartbeat_acknowledged = false; - } - } + HeartbeatHandler::heartbeat_task( + websocket_tx, + heartbeat_interval, + receive, + kill_receive, + ) + .await; }); Self { @@ -1495,6 +1422,97 @@ impl HeartbeatHandler { handle, } } + + /// The main heartbeat task; + /// + /// Can be killed by the kill broadcast; + /// If the websocket is closed, will die out next time it tries to send a heartbeat; + pub async fn heartbeat_task( + websocket_tx: Arc< + Mutex< + SplitSink< + WebSocketStream>, + tokio_tungstenite::tungstenite::Message, + >, + >, + >, + heartbeat_interval: u128, + mut receive: tokio::sync::mpsc::Receiver, + mut kill_receive: tokio::sync::broadcast::Receiver<()>, + ) { + let mut last_heartbeat_timestamp: Instant = time::Instant::now(); + let mut last_heartbeat_acknowledged = true; + let mut last_seq_number: Option = None; + + loop { + let should_shutdown = kill_receive.try_recv().is_ok(); + if should_shutdown { + break; + } + + let mut should_send; + + let time_to_send = last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval; + + should_send = time_to_send; + + let received_communication: Result = + receive.try_recv(); + if received_communication.is_ok() { + let communication = received_communication.unwrap(); + + // If we received a seq number update, use that as the last seq number + if communication.sequence_number.is_some() { + last_seq_number = Some(communication.sequence_number.unwrap()); + } + + if communication.op_code.is_some() { + match communication.op_code.unwrap() { + GATEWAY_HEARTBEAT => { + // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately + should_send = true; + } + GATEWAY_HEARTBEAT_ACK => { + // The server received our heartbeat + last_heartbeat_acknowledged = true; + } + _ => {} + } + } + } + + // If the server hasn't acknowledged our heartbeat we should resend it + if !last_heartbeat_acknowledged + && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT + { + should_send = true; + println!("GW: Timed out waiting for a heartbeat ack, resending"); + } + + if should_send { + println!("GW: Sending Heartbeat.."); + + let heartbeat = types::GatewayHeartbeat { + op: GATEWAY_HEARTBEAT, + d: last_seq_number, + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); + + let send_result = websocket_tx.lock().await.send(msg).await; + if send_result.is_err() { + // We couldn't send, the websocket is broken + println!("GW: Couldnt send heartbeat, websocket seems broken"); + break; + } + + last_heartbeat_timestamp = time::Instant::now(); + last_heartbeat_acknowledged = false; + } + } + } } /** From 88d571486b60318aec157fd45274da75f37da790 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Thu, 8 Jun 2023 19:51:32 +0200 Subject: [PATCH 08/15] Basic tests --- src/gateway.rs | 7 ------- tests/gateway.rs | 24 ++++++++++++++++++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) create mode 100644 tests/gateway.rs diff --git a/src/gateway.rs b/src/gateway.rs index d826bea..f5afc38 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1820,11 +1820,4 @@ mod example { event.subscribe(arc_mut_second_consumer.clone()).unwrap(); } - - #[tokio::test] - async fn test_gateway_establish() { - let _gateway = Gateway::new("ws://localhost:3001/".to_string()) - .await - .unwrap(); - } } diff --git a/tests/gateway.rs b/tests/gateway.rs new file mode 100644 index 0000000..c6f46dd --- /dev/null +++ b/tests/gateway.rs @@ -0,0 +1,24 @@ +mod common; +use chorus::gateway::*; +use chorus::types; + +#[tokio::test] +/// Tests establishing a connection (hello and heartbeats) on the local gateway; +async fn test_gateway_establish() { + let bundle = common::setup().await; + + Gateway::new(bundle.urls.wss).await.unwrap(); +} + +#[tokio::test] +/// Tests establishing a connection and authenticating +async fn test_gateway_authenticate() { + let bundle = common::setup().await; + + let gateway = Gateway::new(bundle.urls.wss).await.unwrap(); + + let mut identify = types::GatewayIdentifyPayload::common(); + identify.token = bundle.user.token; + + gateway.send_identify(identify).await; +} From cd52c5ac3a5207a998973f2ad4a639d09a6e6bfb Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Fri, 9 Jun 2023 18:11:01 +0200 Subject: [PATCH 09/15] Cargo fix --- examples/gateway_observers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/gateway_observers.rs b/examples/gateway_observers.rs index e4c78e1..0920d61 100644 --- a/examples/gateway_observers.rs +++ b/examples/gateway_observers.rs @@ -17,7 +17,7 @@ pub struct ExampleObserver {} // One struct can be an observer of multiple websocketevents, if needed impl Observer for ExampleObserver { // After we subscribe to an event this function is called every time we receive it - fn update(&self, data: &GatewayReady) { + fn update(&self, _data: &GatewayReady) { println!("Observed Ready!"); } } From dc2fc90414e0fa4f6f61104c9d8c51971e3e68fb Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Fri, 9 Jun 2023 18:38:36 +0200 Subject: [PATCH 10/15] Discord.com sends premissions as an integer --- src/types/entities/role.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/types/entities/role.rs b/src/types/entities/role.rs index ba4a2fe..63e99e3 100644 --- a/src/types/entities/role.rs +++ b/src/types/entities/role.rs @@ -1,6 +1,6 @@ use bitflags::bitflags; use serde::{Deserialize, Serialize}; -use serde_aux::prelude::deserialize_option_number_from_string; +use serde_aux::prelude::{deserialize_option_number_from_string, deserialize_string_from_number}; use crate::types::utils::Snowflake; @@ -16,6 +16,7 @@ pub struct RoleObject { pub unicode_emoji: Option, pub position: u16, #[serde(default)] + #[serde(deserialize_with = "deserialize_string_from_number")] pub permissions: String, pub managed: bool, pub mentionable: bool, From a278b63ecbe6a28c2bbf797e7ea76823eeb8ea57 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Fri, 9 Jun 2023 20:22:59 +0200 Subject: [PATCH 11/15] Docs + unneeded &mut --- src/gateway.rs | 16 +++++++++++++--- src/types/events/call.rs | 27 +++++++++++++++------------ src/types/events/guild.rs | 32 +++++++++++++++++++++----------- src/types/events/hello.rs | 4 ++++ src/types/events/ready.rs | 11 +++++------ src/types/events/user.rs | 15 ++++++++------- src/types/events/voice.rs | 16 +++++++++------- 7 files changed, 75 insertions(+), 46 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index f5afc38..1045f94 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -101,7 +101,7 @@ impl GatewayMessage { "unknown opcode" | "4001" => { return Some(GatewayError::UnknownOpcodeError); } - "decode error" | "4002" => { + "decode error" | "error while decoding payload" | "4002" => { return Some(GatewayError::DecodeError); } "not authenticated" | "4003" => { @@ -281,8 +281,10 @@ impl GatewayHandle { .await; } - /// Closes the websocket connection and stops all gateway tasks - pub async fn close(&mut self) { + /// Closes the websocket connection and stops all gateway tasks; + /// + /// Esentially pulls the plug on the gateway, leaving it possible to resume; + pub async fn close(&self) { self.kill_send.send(()).unwrap(); self.websocket_send.lock().await.close().await.unwrap(); } @@ -411,6 +413,14 @@ impl Gateway { return; } + if !msg.is_error() && !msg.is_payload() { + println!( + "Message unrecognised: {:?}, please open an issue on the chorus github", + msg.message.to_string() + ); + return; + } + // To:do: handle errors in a good way, maybe observers like events? if msg.is_error() { println!("GW: Received error, connection will close.."); diff --git a/src/types/events/call.rs b/src/types/events/call.rs index 7407ea6..903e9f7 100644 --- a/src/types/events/call.rs +++ b/src/types/events/call.rs @@ -3,9 +3,10 @@ use serde::{Deserialize, Serialize}; use crate::types::{VoiceState, WebSocketEvent}; #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Officially Undocumented -/// Is sent to a client by the server to signify a new being created -/// {"t":"CALL_CREATE","s":2,"op":0,"d":{"voice_states":[],"ringing":[],"region":"milan","message_id":"1107187514906775613","embedded_activities":[],"channel_id":"837609115475771392"}} +/// Officially Undocumented; +/// Is sent to a client by the server to signify a new call being created; +/// +/// Ex: {"t":"CALL_CREATE","s":2,"op":0,"d":{"voice_states":[],"ringing":[],"region":"milan","message_id":"1107187514906775613","embedded_activities":[],"channel_id":"837609115475771392"}} pub struct CallCreate { pub voice_states: Vec, /// Seems like a vec of channel ids @@ -19,9 +20,10 @@ pub struct CallCreate { impl WebSocketEvent for CallCreate {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Officially Undocumented -/// Updates the status of calls -/// {"t":"CALL_UPDATE","s":5,"op":0,"d":{"ringing":["837606544539254834"],"region":"milan","message_id":"1107191540234846308","guild_id":null,"channel_id":"837609115475771392"}} +/// Officially Undocumented; +/// Updates the client on which calls are ringing, along with a specific call?; +/// +/// Ex: {"t":"CALL_UPDATE","s":5,"op":0,"d":{"ringing":["837606544539254834"],"region":"milan","message_id":"1107191540234846308","guild_id":null,"channel_id":"837609115475771392"}} pub struct CallUpdate { /// Seems like a vec of channel ids pub ringing: Vec, @@ -33,18 +35,19 @@ pub struct CallUpdate { impl WebSocketEvent for CallUpdate {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Officially Undocumented -/// Deletes a ringing call -/// {"t":"CALL_DELETE","s":8,"op":0,"d":{"channel_id":"837609115475771392"}} +/// Officially Undocumented; +/// Deletes a ringing call; +/// Ex: {"t":"CALL_DELETE","s":8,"op":0,"d":{"channel_id":"837609115475771392"}} pub struct CallDelete { pub channel_id: String, } impl WebSocketEvent for CallDelete {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Officially Undocumented -/// See https://unofficial-discord-docs.vercel.app/gateway/op13 -/// {"op":13,"d":{"channel_id":"837609115475771392"}} +/// Officially Undocumented; +/// See https://unofficial-discord-docs.vercel.app/gateway/op13; +/// +/// Ex: {"op":13,"d":{"channel_id":"837609115475771392"}} pub struct CallSync { pub channel_id: String, } diff --git a/src/types/events/guild.rs b/src/types/events/guild.rs index 34dbc01..7364453 100644 --- a/src/types/events/guild.rs +++ b/src/types/events/guild.rs @@ -7,8 +7,9 @@ use serde::{Deserialize, Serialize}; use super::PresenceUpdate; #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-create -/// This one is particularly painful, it can be a Guild object with an extra field or an unavailable guild object +/// See https://discord.com/developers/docs/topics/gateway-events#guild-create; +/// Received to give data about a guild; +// This one is particularly painful, it can be a Guild object with an extra field or an unavailable guild object pub struct GuildCreate { #[serde(flatten)] pub d: GuildCreateDataOption, @@ -29,7 +30,8 @@ impl Default for GuildCreateDataOption { impl WebSocketEvent for GuildCreate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-add-guild-ban-add-event-fields +/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-add-guild-ban-add-event-fields; +/// Received to give info about a user being banned from a guild; pub struct GuildBanAdd { pub guild_id: String, pub user: PublicUser, @@ -38,7 +40,8 @@ pub struct GuildBanAdd { impl WebSocketEvent for GuildBanAdd {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-remove +/// See https://discord.com/developers/docs/topics/gateway-events#guild-ban-remove; +/// Received to give info about a user being unbanned from a guild; pub struct GuildBanRemove { pub guild_id: String, pub user: PublicUser, @@ -47,7 +50,8 @@ pub struct GuildBanRemove { impl WebSocketEvent for GuildBanRemove {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-update +/// See https://discord.com/developers/docs/topics/gateway-events#guild-update; +/// Received to give info about a guild being updated; pub struct GuildUpdate { #[serde(flatten)] pub guild: Guild, @@ -56,7 +60,8 @@ pub struct GuildUpdate { impl WebSocketEvent for GuildUpdate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-delete +/// See https://discord.com/developers/docs/topics/gateway-events#guild-delete; +/// Received to tell the client about a guild being deleted; pub struct GuildDelete { #[serde(flatten)] pub guild: UnavailableGuild, @@ -65,7 +70,8 @@ pub struct GuildDelete { impl WebSocketEvent for GuildDelete {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-audit-log-entry-create +/// See https://discord.com/developers/docs/topics/gateway-events#guild-audit-log-entry-create; +/// Received to the client about an audit log entry being added; pub struct GuildAuditLogEntryCreate { #[serde(flatten)] pub entry: AuditLogEntry, @@ -74,7 +80,8 @@ pub struct GuildAuditLogEntryCreate { impl WebSocketEvent for GuildAuditLogEntryCreate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-emojis-update +/// See https://discord.com/developers/docs/topics/gateway-events#guild-emojis-update; +/// Received to tell the client about a change to a guild's emoji list; pub struct GuildEmojisUpdate { pub guild_id: String, pub emojis: Vec, @@ -83,7 +90,8 @@ pub struct GuildEmojisUpdate { impl WebSocketEvent for GuildEmojisUpdate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-stickers-update +/// See https://discord.com/developers/docs/topics/gateway-events#guild-stickers-update; +/// Received to tell the client about a change to a guild's sticker list; pub struct GuildStickersUpdate { pub guild_id: String, pub stickers: Vec, @@ -100,7 +108,8 @@ pub struct GuildIntegrationsUpdate { impl WebSocketEvent for GuildIntegrationsUpdate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-member-add +/// See https://discord.com/developers/docs/topics/gateway-events#guild-member-add; +/// Received to tell the client about a user joining a guild; pub struct GuildMemberAdd { #[serde(flatten)] pub member: GuildMember, @@ -110,7 +119,8 @@ pub struct GuildMemberAdd { impl WebSocketEvent for GuildMemberAdd {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#guild-member-remove +/// See https://discord.com/developers/docs/topics/gateway-events#guild-member-remove; +/// Received to tell the client about a user leaving a guild; pub struct GuildMemberRemove { pub guild_id: String, pub user: PublicUser, diff --git a/src/types/events/hello.rs b/src/types/events/hello.rs index 85ef1f2..d84dafe 100644 --- a/src/types/events/hello.rs +++ b/src/types/events/hello.rs @@ -2,6 +2,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Deserialize, Serialize, Clone)] +/// Received on gateway init, tells the client how often to send heartbeats; pub struct GatewayHello { pub op: i32, pub d: HelloData, @@ -10,7 +11,10 @@ pub struct GatewayHello { impl WebSocketEvent for GatewayHello {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] +/// Contains info on how often the client should send heartbeats to the server; pub struct HelloData { + /// How often a client should send heartbeats, in milliseconds + // u128 because std used u128s for milliseconds pub heartbeat_interval: u128, } diff --git a/src/types/events/ready.rs b/src/types/events/ready.rs index e637b82..7adae03 100644 --- a/src/types/events/ready.rs +++ b/src/types/events/ready.rs @@ -5,9 +5,9 @@ use crate::types::{Activity, GuildMember, PresenceUpdate, VoiceState}; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Sort of documented, though most fields are left out -/// For a full example see https://gist.github.com/kozabrada123/a347002b1fb8825a5727e40746d4e199 -/// to:do add all undocumented fields +/// 1/2 half documented; +/// Received after identifying, provides initial user info; +/// See https://discord.com/developers/docs/topics/gateway-events#ready; pub struct GatewayReady { pub analytics_token: Option, pub auth_session_id_hash: Option, @@ -28,9 +28,8 @@ pub struct GatewayReady { impl WebSocketEvent for GatewayReady {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// Officially Undocumented -/// Sent after the READY event when a client is a user -/// {"t":"READY_SUPPLEMENTAL","s":2,"op":0,"d":{"merged_presences":{"guilds":[[{"user_id":"463640391196082177","status":"online","game":null,"client_status":{"web":"online"},"activities":[]}]],"friends":[{"user_id":"463640391196082177","status":"online","last_modified":1684053508443,"client_status":{"web":"online"},"activities":[]}]},"merged_members":[[{"user_id":"463640391196082177","roles":[],"premium_since":null,"pending":false,"nick":"pog","mute":false,"joined_at":"2021-05-30T15:24:08.763000+00:00","flags":0,"deaf":false,"communication_disabled_until":null,"avatar":null}]],"lazy_private_channels":[],"guilds":[{"voice_states":[],"id":"848582562217590824","embedded_activities":[]}],"disclose":["pomelo"]}} +/// Officially Undocumented; +/// Sent after the READY event when a client is a user, seems to somehow add onto the ready event; pub struct GatewayReadySupplemental { pub merged_presences: MergedPresences, pub merged_members: Vec>, diff --git a/src/types/events/user.rs b/src/types/events/user.rs index 01c1a4b..fa9d729 100644 --- a/src/types/events/user.rs +++ b/src/types/events/user.rs @@ -4,7 +4,8 @@ use crate::types::utils::Snowflake; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#user-update +/// See https://discord.com/developers/docs/topics/gateway-events#user-update; +/// Sent to indicate updates to a user object; (name changes, discriminator changes, etc); pub struct UserUpdate { #[serde(flatten)] pub user: PublicUser, @@ -13,11 +14,11 @@ pub struct UserUpdate { impl WebSocketEvent for UserUpdate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// Undocumented +/// Undocumented; /// -/// Possibly an update for muted guild / channel settings for the current user +/// Possibly an update for muted guild / channel settings for the current user; /// -/// {"version":2,"suppress_roles":false,"suppress_everyone":false,"notify_highlights":0,"muted":false,"mute_scheduled_events":false,"mute_config":null,"mobile_push":true,"message_notifications":1,"hide_muted_channels":false,"guild_id":"848582562217590824","flags":0,"channel_overrides":[{"muted":false,"mute_config":null,"message_notifications":3,"flags":4096,"collapsed":false,"channel_id":"1042689182893604885"}]} +/// Ex: {"version":2,"suppress_roles":false,"suppress_everyone":false,"notify_highlights":0,"muted":false,"mute_scheduled_events":false,"mute_config":null,"mobile_push":true,"message_notifications":1,"hide_muted_channels":false,"guild_id":"848582562217590824","flags":0,"channel_overrides":[{"muted":false,"mute_config":null,"message_notifications":3,"flags":4096,"collapsed":false,"channel_id":"1042689182893604885"}]} pub struct UserGuildSettingsUpdate { pub version: u8, pub suppress_roles: bool, @@ -38,11 +39,11 @@ pub struct UserGuildSettingsUpdate { impl WebSocketEvent for UserGuildSettingsUpdate {} #[derive(Debug, Default, Deserialize, Serialize, Clone)] -/// Undocumented +/// Undocumented; /// -/// Received in [UserGuildSettingsUpdate] +/// Received in [UserGuildSettingsUpdate]; /// -/// {"muted":false,"mute_config":null,"message_notifications":3,"flags":4096,"collapsed":false,"channel_id":"1042689182893604885"} +/// Ex: {"muted":false,"mute_config":null,"message_notifications":3,"flags":4096,"collapsed":false,"channel_id":"1042689182893604885"} pub struct UserGuildSettingsChannelOverride { pub muted: bool, /// ?? diff --git a/src/types/events/voice.rs b/src/types/events/voice.rs index c7b4f62..3d458bc 100644 --- a/src/types/events/voice.rs +++ b/src/types/events/voice.rs @@ -2,11 +2,11 @@ use crate::types::{events::WebSocketEvent, VoiceState}; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#update-voice-state +/// See https://discord.com/developers/docs/topics/gateway-events#update-voice-state; /// -/// Sent to the server +/// Sent to the server to indicate an update of the voice state (leave voice channel, join voice channel, mute, deafen); /// -/// Not to be confused with [VoiceStateUpdate] +/// Not to be confused with [VoiceStateUpdate]; pub struct UpdateVoiceState { pub guild_id: Option, pub channel_id: Option, @@ -17,11 +17,11 @@ pub struct UpdateVoiceState { impl WebSocketEvent for UpdateVoiceState {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#voice-state-update +/// See https://discord.com/developers/docs/topics/gateway-events#voice-state-update; /// -/// Received from the server +/// Received from the server to indicate an update in a user's voice state (leave voice channel, join voice channel, mute, deafen, etc); /// -/// Not to be confused with [UpdateVoiceState] +/// Not to be confused with [UpdateVoiceState]; pub struct VoiceStateUpdate { #[serde(flatten)] pub state: VoiceState, @@ -30,7 +30,9 @@ pub struct VoiceStateUpdate { impl WebSocketEvent for VoiceStateUpdate {} #[derive(Debug, Deserialize, Serialize, Default, Clone)] -/// See https://discord.com/developers/docs/topics/gateway-events#voice-server-update +/// See https://discord.com/developers/docs/topics/gateway-events#voice-server-update; +/// +/// Received to indicate which voice endpoint, token and guild_id to use; pub struct VoiceServerUpdate { pub token: String, pub guild_id: String, From d5b883a08852fc0b39b499a0584d6bc841d02c44 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Sat, 10 Jun 2023 12:43:07 +0200 Subject: [PATCH 12/15] Refactor, better deserialization error handling --- src/gateway.rs | 1580 +++++++++++++++++++++++++++--------------------- 1 file changed, 876 insertions(+), 704 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 1045f94..c87b1d8 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -2,11 +2,13 @@ use crate::errors::GatewayError; use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::types; +use crate::types::WebSocketEvent; use futures_util::stream::SplitSink; use futures_util::stream::SplitStream; use futures_util::SinkExt; use futures_util::StreamExt; use native_tls::TlsConnector; +use serde_json::value::RawValue; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::mpsc::error::TryRecvError; @@ -388,7 +390,7 @@ impl Gateway { if msg.as_ref().is_some() { if msg.as_ref().unwrap().is_ok() { let msg_unwrapped = msg.unwrap().unwrap(); - self.handle_event(GatewayMessage::from_tungstenite_message(msg_unwrapped)) + self.handle_message(GatewayMessage::from_tungstenite_message(msg_unwrapped)) .await; continue; @@ -407,8 +409,24 @@ impl Gateway { self.websocket_send.lock().await.close().await.unwrap(); } + /// Deserializes and updates a dispatched event, when we already know its type; + /// (Called for every event in handle_message) + async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( + data: &'a str, + event: &mut GatewayEvent, + ) -> Result<(), serde_json::Error> { + let data_deserialize_result: Result = serde_json::from_str(data); + + if data_deserialize_result.is_err() { + return Err(data_deserialize_result.err().unwrap()); + } + + event.update_data(data_deserialize_result.unwrap()).await; + return Ok(()); + } + /// This handles a message as a websocket event and updates its events along with the events' observers - pub async fn handle_event(&mut self, msg: GatewayMessage) { + pub async fn handle_message(&mut self, msg: GatewayMessage) { if msg.is_empty() { return; } @@ -451,858 +469,1012 @@ impl Gateway { // "Some" of these are undocumented match gateway_payload_t.as_str() { "READY" => { - let new_data: types::GatewayReady = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .session - .ready - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.session.ready; + + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "READY_SUPPLEMENTAL" => { - let new_data: types::GatewayReadySupplemental = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .session - .ready_supplemental - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.session.ready_supplemental; + + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "RESUMED" => {} "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => { - let new_data: types::ApplicationCommandPermissionsUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events + let event = &mut self + .events .lock() .await .application - .command_permissions_update - .update_data(new_data) - .await; + .command_permissions_update; + + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "AUTO_MODERATION_RULE_CREATE" => { - let new_data: types::AutoModerationRuleCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .auto_moderation - .rule_create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.auto_moderation.rule_create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "AUTO_MODERATION_RULE_UPDATE" => { - let new_data: types::AutoModerationRuleUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .auto_moderation - .rule_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.auto_moderation.rule_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "AUTO_MODERATION_RULE_DELETE" => { - let new_data: types::AutoModerationRuleDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .auto_moderation - .rule_delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.auto_moderation.rule_delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "AUTO_MODERATION_ACTION_EXECUTION" => { - let new_data: types::AutoModerationActionExecution = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .auto_moderation - .action_execution - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.auto_moderation.action_execution; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CHANNEL_CREATE" => { - let new_data: types::ChannelCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .channel - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.channel.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CHANNEL_UPDATE" => { - let new_data: types::ChannelUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .channel - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.channel.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CHANNEL_UNREAD_UPDATE" => { - let new_data: types::ChannelUnreadUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .channel - .unread_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.channel.unread_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CHANNEL_DELETE" => { - let new_data: types::ChannelDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .channel - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.channel.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CHANNEL_PINS_UPDATE" => { - let new_data: types::ChannelPinsUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .channel - .pins_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.channel.pins_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CALL_CREATE" => { - let new_data: types::CallCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .call - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.call.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CALL_UPDATE" => { - let new_data: types::CallUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .call - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.call.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "CALL_DELETE" => { - let new_data: types::CallDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .call - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.call.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_CREATE" => { - let new_data: types::ThreadCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_UPDATE" => { - let new_data: types::ThreadUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_DELETE" => { - let new_data: types::ThreadDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_LIST_SYNC" => { - let new_data: types::ThreadListSync = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .list_sync - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.list_sync; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_MEMBER_UPDATE" => { - let new_data: types::ThreadMemberUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .member_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.member_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "THREAD_MEMBERS_UPDATE" => { - let new_data: types::ThreadMembersUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .thread - .members_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.thread.members_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_CREATE" => { - let new_data: types::GuildCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_UPDATE" => { - let new_data: types::GuildUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_DELETE" => { - let new_data: types::GuildDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_AUDIT_LOG_ENTRY_CREATE" => { - let new_data: types::GuildAuditLogEntryCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .audit_log_entry_create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.audit_log_entry_create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_BAN_ADD" => { - let new_data: types::GuildBanAdd = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .ban_add - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.ban_add; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_BAN_REMOVE" => { - let new_data: types::GuildBanRemove = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .ban_remove - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.ban_remove; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_EMOJIS_UPDATE" => { - let new_data: types::GuildEmojisUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .emojis_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.emojis_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_STICKERS_UPDATE" => { - let new_data: types::GuildStickersUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .stickers_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.stickers_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_INTEGRATIONS_UPDATE" => { - let new_data: types::GuildIntegrationsUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .integrations_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.integrations_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_MEMBER_ADD" => { - let new_data: types::GuildMemberAdd = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .member_add - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.member_add; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_MEMBER_REMOVE" => { - let new_data: types::GuildMemberRemove = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .member_remove - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.member_remove; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_MEMBER_UPDATE" => { - let new_data: types::GuildMemberUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .member_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.member_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_MEMBERS_CHUNK" => { - let new_data: types::GuildMembersChunk = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .members_chunk - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.members_chunk; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_ROLE_CREATE" => { - let new_data: types::GuildRoleCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_ROLE_UPDATE" => { - let new_data: types::GuildRoleUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_ROLE_DELETE" => { - let new_data: types::GuildRoleDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_SCHEDULED_EVENT_CREATE" => { - let new_data: types::GuildScheduledEventCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_scheduled_event_create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_scheduled_event_create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_SCHEDULED_EVENT_UPDATE" => { - let new_data: types::GuildScheduledEventUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_scheduled_event_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_scheduled_event_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_SCHEDULED_EVENT_DELETE" => { - let new_data: types::GuildScheduledEventDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_scheduled_event_delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.role_scheduled_event_delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_SCHEDULED_EVENT_USER_ADD" => { - let new_data: types::GuildScheduledEventUserAdd = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .role_scheduled_event_user_add - .update_data(new_data) - .await; + let event = + &mut self.events.lock().await.guild.role_scheduled_event_user_add; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "GUILD_SCHEDULED_EVENT_USER_REMOVE" => { - let new_data: types::GuildScheduledEventUserRemove = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events + let event = &mut self + .events .lock() .await .guild - .role_scheduled_event_user_remove - .update_data(new_data) - .await; + .role_scheduled_event_user_remove; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "PASSIVE_UPDATE_V1" => { - let new_data: types::PassiveUpdateV1 = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .guild - .passive_update_v1 - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.guild.passive_update_v1; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INTEGRATION_CREATE" => { - let new_data: types::IntegrationCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .integration - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.integration.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INTEGRATION_UPDATE" => { - let new_data: types::IntegrationUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .integration - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.integration.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INTEGRATION_DELETE" => { - let new_data: types::IntegrationDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .integration - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.integration.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INTERACTION_CREATE" => { - let new_data: types::InteractionCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .interaction - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.interaction.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INVITE_CREATE" => { - let new_data: types::InviteCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .invite - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.invite.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "INVITE_DELETE" => { - let new_data: types::InviteDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .invite - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.invite.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_CREATE" => { - let new_data: types::MessageCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_UPDATE" => { - let new_data: types::MessageUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_DELETE" => { - let new_data: types::MessageDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_DELETE_BULK" => { - let new_data: types::MessageDeleteBulk = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .delete_bulk - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.delete_bulk; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_REACTION_ADD" => { - let new_data: types::MessageReactionAdd = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .reaction_add - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.reaction_add; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_REACTION_REMOVE" => { - let new_data: types::MessageReactionRemove = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .reaction_remove - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.reaction_remove; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_REACTION_REMOVE_ALL" => { - let new_data: types::MessageReactionRemoveAll = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .reaction_remove_all - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.reaction_remove_all; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_REACTION_REMOVE_EMOJI" => { - let new_data: types::MessageReactionRemoveEmoji = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .reaction_remove_emoji - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.reaction_remove_emoji; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "MESSAGE_ACK" => { - let new_data: types::MessageACK = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .message - .ack - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.message.ack; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "PRESENCE_UPDATE" => { - let new_data: types::PresenceUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .user - .presence_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.user.presence_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "RELATIONSHIP_ADD" => { - let new_data: types::RelationshipAdd = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .relationship - .add - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.relationship.add; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "RELATIONSHIP_REMOVE" => { - let new_data: types::RelationshipRemove = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .relationship - .remove - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.relationship.remove; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "STAGE_INSTANCE_CREATE" => { - let new_data: types::StageInstanceCreate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .stage_instance - .create - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.stage_instance.create; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "STAGE_INSTANCE_UPDATE" => { - let new_data: types::StageInstanceUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .stage_instance - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.stage_instance.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "STAGE_INSTANCE_DELETE" => { - let new_data: types::StageInstanceDelete = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .stage_instance - .delete - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.stage_instance.delete; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "SESSIONS_REPLACE" => { - let sessions: Vec = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - let new_data = types::SessionsReplace { sessions }; + let result: Result, serde_json::Error> = + serde_json::from_str(gateway_payload.event_data.unwrap().get()); + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } + + let data = types::SessionsReplace { + sessions: result.unwrap(), + }; + self.events .lock() .await .session .replace - .update_data(new_data) - .await; - } - "TYPING_START" => { - let new_data: types::TypingStartEvent = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .user - .typing_start_event - .update_data(new_data) + .update_data(data) .await; } "USER_UPDATE" => { - let new_data: types::UserUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .user - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.user.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "USER_GUILD_SETTINGS_UPDATE" => { - let new_data: types::UserGuildSettingsUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .user - .guild_settings_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.user.guild_settings_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "VOICE_STATE_UPDATE" => { - let new_data: types::VoiceStateUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .voice - .state_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.voice.state_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "VOICE_SERVER_UPDATE" => { - let new_data: types::VoiceServerUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .voice - .server_update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.voice.server_update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } "WEBHOOKS_UPDATE" => { - let new_data: types::WebhooksUpdate = - serde_json::from_str(gateway_payload.event_data.unwrap().get()) - .unwrap(); - self.events - .lock() - .await - .webhooks - .update - .update_data(new_data) - .await; + let event = &mut self.events.lock().await.webhooks.update; + let result = + Gateway::handle_event(gateway_payload.event_data.unwrap().get(), event) + .await; + if result.is_err() { + println!( + "Failed to parse gateway event {} ({})", + gateway_payload_t, + result.err().unwrap() + ); + return; + } } _ => { println!("Received unrecognized gateway event ({})! Please open an issue on the chorus github so we can implement it", &gateway_payload_t); @@ -1812,7 +1984,7 @@ mod example { let consumer = Consumer; let arc_mut_consumer = Arc::new(Mutex::new(consumer)); - event.subscribe(arc_mut_consumer.clone()); + event.subscribe(arc_mut_consumer.clone()).unwrap(); event.notify().await; From 78e63657c8109d8e9e5d95365aadd0e9d40c75af Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Sat, 10 Jun 2023 15:37:02 +0200 Subject: [PATCH 13/15] Change observer update ref to &mut from & --- examples/gateway_observers.rs | 2 +- src/gateway.rs | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/examples/gateway_observers.rs b/examples/gateway_observers.rs index 0920d61..166b84a 100644 --- a/examples/gateway_observers.rs +++ b/examples/gateway_observers.rs @@ -17,7 +17,7 @@ pub struct ExampleObserver {} // One struct can be an observer of multiple websocketevents, if needed impl Observer for ExampleObserver { // After we subscribe to an event this function is called every time we receive it - fn update(&self, _data: &GatewayReady) { + fn update(&mut self, _data: &GatewayReady) { println!("Observed Ready!"); } } diff --git a/src/gateway.rs b/src/gateway.rs index c87b1d8..e49e491 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -351,8 +351,11 @@ impl Gateway { let gateway_hello: types::HelloData = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap(); + let events = Events::default(); + let shared_events = Arc::new(Mutex::new(events)); + let mut gateway = Gateway { - events: Arc::new(Mutex::new(Events::default())), + events: shared_events.clone(), heartbeat_handler: HeartbeatHandler::new( gateway_hello.heartbeat_interval, shared_websocket_send.clone(), @@ -363,8 +366,6 @@ impl Gateway { kill_send: kill_send.clone(), }; - let shared_events = gateway.events.clone(); - // Now we can continuously check for messages in a different task, since we aren't going to receive another hello let handle: JoinHandle<()> = task::spawn(async move { gateway.gateway_listen_task().await; @@ -1715,7 +1716,7 @@ an Observable. The Observer is notified when the Observable's data changes. In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. */ pub trait Observer: std::fmt::Debug { - fn update(&self, data: &T); + fn update(&mut self, data: &T); } /** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a @@ -1790,7 +1791,9 @@ impl GatewayEvent { */ async fn notify(&self) { for observer in &self.observers { - observer.lock().await.update(&self.event_data); + let mut observer_lock = observer.lock().await; + observer_lock.update(&self.event_data); + drop(observer_lock); } } } @@ -1962,7 +1965,7 @@ mod example { #[derive(Debug)] struct Consumer; impl Observer for Consumer { - fn update(&self, data: &types::GatewayResume) { + fn update(&mut self, data: &types::GatewayResume) { println!("{}", data.token) } } From 3db9114ecf19cb2d6d312bfc23d8bac686c8f277 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Sat, 10 Jun 2023 16:32:42 +0200 Subject: [PATCH 14/15] turns out UpdatePresence and PresenceUpdate are different events --- src/gateway.rs | 4 ++-- src/types/events/presence.rs | 22 ++++++++++++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index e49e491..c8c5be9 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -235,10 +235,10 @@ impl GatewayHandle { } /// Sends an update presence event to the gateway - pub async fn send_update_presence(&self, to_send: types::PresenceUpdate) { + pub async fn send_update_presence(&self, to_send: types::UpdatePresence) { let to_send_value = serde_json::to_value(&to_send).unwrap(); - println!("GW: Sending Presence Update.."); + println!("GW: Sending Update Presence.."); self.send_json_event(GATEWAY_UPDATE_PRESENCE, to_send_value) .await; diff --git a/src/types/events/presence.rs b/src/types/events/presence.rs index dd49fde..aacc170 100644 --- a/src/types/events/presence.rs +++ b/src/types/events/presence.rs @@ -1,14 +1,28 @@ -use crate::types::events::WebSocketEvent; use crate::types::interfaces::Activity; -use crate::types::PublicUser; +use crate::types::{events::WebSocketEvent, UserStatus}; +use crate::types::{PublicUser, Snowflake}; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Default, Clone)] +/// Sent by the client to update its status and presence; +/// See https://discord.com/developers/docs/topics/gateway-events#update-presence +pub struct UpdatePresence { + /// unix time of when the client went idle, or none if client is not idle + pub since: Option, + /// the client's status (online, invisible, offline, dnd, idle..) + pub status: UserStatus, + pub activities: Vec, + pub afk: bool, +} + +#[derive(Debug, Deserialize, Serialize, Default, Clone)] +/// Received to tell the client that a user updated their presence / status /// See https://discord.com/developers/docs/topics/gateway-events#presence-update-presence-update-event-fields pub struct PresenceUpdate { pub user: PublicUser, - pub guild_id: Option, - pub status: String, + #[serde(default)] + pub guild_id: Option, + pub status: UserStatus, pub activities: Vec, pub client_status: ClientStatusObject, } From 17194a2fcc1c77824547ab1d4f4454c5658cefe5 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Sat, 10 Jun 2023 16:56:50 +0200 Subject: [PATCH 15/15] Implement display for UserStatus --- src/gateway.rs | 1 - src/types/entities/user_settings.rs | 6 ++++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index c8c5be9..78e202d 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -8,7 +8,6 @@ use futures_util::stream::SplitStream; use futures_util::SinkExt; use futures_util::StreamExt; use native_tls::TlsConnector; -use serde_json::value::RawValue; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::mpsc::error::TryRecvError; diff --git a/src/types/entities/user_settings.rs b/src/types/entities/user_settings.rs index 7b9541f..40b936a 100644 --- a/src/types/entities/user_settings.rs +++ b/src/types/entities/user_settings.rs @@ -13,6 +13,12 @@ pub enum UserStatus { Invisible, } +impl std::fmt::Display for UserStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap()) + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] #[cfg_attr(feature = "sqlx", derive(sqlx::Type))] #[serde(rename_all = "lowercase")]