2023-04-30 14:45:15 +02:00
use std ::sync ::Arc ;
2023-04-28 18:18:32 +02:00
use crate ::api ::types ::* ;
use crate ::api ::WebSocketEvent ;
use crate ::errors ::ObserverError ;
use crate ::gateway ::events ::Events ;
2023-04-30 14:45:15 +02:00
use futures_util ::SinkExt ;
2023-04-28 23:21:55 +02:00
use futures_util ::StreamExt ;
2023-05-07 11:58:12 +02:00
use futures_util ::stream ::SplitSink ;
2023-04-30 11:56:14 +02:00
use native_tls ::TlsConnector ;
2023-04-28 18:18:32 +02:00
use tokio ::net ::TcpStream ;
2023-05-05 18:38:04 +02:00
use tokio ::sync ::mpsc ;
2023-05-07 11:58:12 +02:00
use tokio ::sync ::mpsc ::error ::TryRecvError ;
2023-05-13 16:35:42 +02:00
use tokio ::sync ::mpsc ::Sender ;
2023-04-30 11:56:14 +02:00
use tokio ::sync ::Mutex ;
2023-04-30 14:45:15 +02:00
use tokio ::task ;
2023-05-05 18:38:04 +02:00
use tokio ::time ;
use tokio ::time ::Instant ;
2023-05-07 11:58:12 +02:00
use tokio_tungstenite ::MaybeTlsStream ;
use tokio_tungstenite ::{ WebSocketStream , Connector , connect_async_tls_with_config } ;
2023-05-03 21:06:01 +02:00
2023-05-11 22:47:31 +02:00
#[ derive(Debug) ]
2023-04-28 13:40:29 +02:00
/**
2023-05-11 22:47:31 +02:00
Represents a handle to a Gateway connection . A Gateway connection will create observable
2023-04-28 18:18:32 +02:00
[ ` GatewayEvents ` ] ( GatewayEvent ) , which you can subscribe to . Gateway events include all currently
implemented [ Types ] with the trait [ ` WebSocketEvent ` ]
2023-05-11 22:47:31 +02:00
Using this handle you can also send Gateway Events directly .
2023-04-28 18:18:32 +02:00
* /
2023-05-11 22:47:31 +02:00
pub struct GatewayHandle {
2023-04-28 18:18:32 +02:00
pub url : String ,
2023-05-11 22:47:31 +02:00
pub events : Arc < Mutex < Events > > ,
pub websocket_tx : Arc < Mutex < SplitSink < WebSocketStream < MaybeTlsStream < TcpStream > > , tokio_tungstenite ::tungstenite ::Message > > > ,
2023-04-28 18:18:32 +02:00
}
2023-05-11 22:47:31 +02:00
impl GatewayHandle {
2023-05-05 19:23:57 +02:00
/// Sends json to the gateway with an opcode
2023-05-05 20:58:00 +02:00
async fn send_json_event ( & self , op : u8 , to_send : serde_json ::Value ) {
2023-05-05 19:23:57 +02:00
2023-05-05 20:58:00 +02:00
let gateway_payload = GatewayPayload { op , d : Some ( to_send ) , s : None , t : None } ;
2023-05-05 19:23:57 +02:00
let payload_json = serde_json ::to_string ( & gateway_payload ) . unwrap ( ) ;
let message = tokio_tungstenite ::tungstenite ::Message ::text ( payload_json ) ;
2023-05-11 22:47:31 +02:00
self . websocket_tx . lock ( ) . await . send ( message ) . await . unwrap ( ) ;
2023-05-05 19:23:57 +02:00
}
/// Sends an identify event to the gateway
pub async fn send_identify ( & self , to_send : GatewayIdentifyPayload ) {
2023-05-05 20:58:00 +02:00
let to_send_value = serde_json ::to_value ( & to_send ) . unwrap ( ) ;
2023-05-05 19:23:57 +02:00
2023-05-07 12:44:11 +02:00
println! ( " GW: Sending Identify.. " ) ;
2023-05-05 20:58:00 +02:00
self . send_json_event ( 2 , to_send_value ) . await ;
2023-05-05 19:23:57 +02:00
}
/// Sends a resume event to the gateway
pub async fn send_resume ( & self , to_send : GatewayResume ) {
2023-05-05 20:58:00 +02:00
let to_send_value = serde_json ::to_value ( & to_send ) . unwrap ( ) ;
2023-05-05 19:23:57 +02:00
2023-05-07 12:44:11 +02:00
println! ( " GW: Sending Resume.. " ) ;
2023-05-05 20:58:00 +02:00
self . send_json_event ( 6 , to_send_value ) . await ;
2023-05-05 19:23:57 +02:00
}
/// Sends an update presence event to the gateway
pub async fn send_update_presence ( & self , to_send : PresenceUpdate ) {
2023-05-05 20:58:00 +02:00
let to_send_value = serde_json ::to_value ( & to_send ) . unwrap ( ) ;
2023-05-05 19:23:57 +02:00
2023-05-13 15:59:46 +02:00
println! ( " GW: Sending Presence Update.. " ) ;
2023-05-05 20:58:00 +02:00
self . send_json_event ( 3 , to_send_value ) . await ;
2023-04-30 14:45:15 +02:00
}
2023-05-13 15:59:46 +02:00
/// Sends a Request Guild Members to the server
pub async fn send_request_guild_members ( & self , to_send : GatewayRequestGuildMembers ) {
let to_send_value = serde_json ::to_value ( & to_send ) . unwrap ( ) ;
println! ( " GW: Sending Request Guild Members.. " ) ;
self . send_json_event ( 8 , to_send_value ) . await ;
}
/// Sends a Request Guild Members to the server
pub async fn send_update_voice_state ( & self , to_send : GatewayVoiceStateUpdate ) {
let to_send_value = serde_json ::to_value ( & to_send ) . unwrap ( ) ;
println! ( " GW: Sending Voice State Update.. " ) ;
self . send_json_event ( 4 , to_send_value ) . await ;
}
2023-04-30 14:45:15 +02:00
}
2023-05-11 22:47:31 +02:00
pub struct Gateway {
pub events : Arc < Mutex < Events > > ,
heartbeat_handler : Option < HeartbeatHandler > ,
pub websocket_tx : Arc < Mutex < SplitSink < WebSocketStream < MaybeTlsStream < TcpStream > > , tokio_tungstenite ::tungstenite ::Message > > >
}
2023-05-13 16:43:29 +02:00
impl Gateway {
2023-05-11 22:47:31 +02:00
pub async fn new (
websocket_url : String ,
) -> Result < GatewayHandle , tokio_tungstenite ::tungstenite ::Error > {
let ( ws_stream , _ ) = match connect_async_tls_with_config (
& websocket_url ,
None ,
Some ( Connector ::NativeTls (
TlsConnector ::builder ( ) . build ( ) . unwrap ( ) ,
) ) ,
)
. await
{
Ok ( ws_stream ) = > ws_stream ,
2023-05-13 09:47:12 +02:00
Err ( e ) = > return Err ( e ) ,
2023-05-11 22:47:31 +02:00
} ;
let ( ws_tx , mut ws_rx ) = ws_stream . split ( ) ;
let shared_tx = Arc ::new ( Mutex ::new ( ws_tx ) ) ;
let mut gateway = Gateway { events : Arc ::new ( Mutex ::new ( Events ::default ( ) ) ) , heartbeat_handler : None , websocket_tx : shared_tx . clone ( ) } ;
let shared_events = gateway . events . clone ( ) ;
2023-05-13 09:47:12 +02:00
// Wait for the first hello and then spawn both tasks so we avoid nested tasks
// This automatically spawns the heartbeat task, but from the main thread
let msg = ws_rx . next ( ) . await . unwrap ( ) . unwrap ( ) ;
let gateway_payload : GatewayPayload = serde_json ::from_str ( msg . to_text ( ) . unwrap ( ) ) . unwrap ( ) ;
if gateway_payload . op ! = 10 {
println! ( " Recieved non hello on gateway init, what is happening? " ) ;
return Err ( tokio_tungstenite ::tungstenite ::Error ::Protocol ( tokio_tungstenite ::tungstenite ::error ::ProtocolError ::InvalidOpcode ( gateway_payload . op ) ) )
}
println! ( " GW: Received Hello " ) ;
let gateway_hello : HelloData = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
gateway . heartbeat_handler = Some ( HeartbeatHandler ::new ( gateway_hello . heartbeat_interval , shared_tx . clone ( ) ) ) ;
// Now we can continously check for messages in a different task, since we aren't going to receive another hello
2023-05-11 22:47:31 +02:00
task ::spawn ( async move {
loop {
let msg = ws_rx . next ( ) . await ;
if msg . as_ref ( ) . is_some ( ) {
let msg_unwrapped = msg . unwrap ( ) . unwrap ( ) ;
gateway . handle_event ( msg_unwrapped ) . await ;
} ;
}
} ) ;
return Ok ( GatewayHandle {
url : websocket_url . clone ( ) ,
events : shared_events ,
websocket_tx : shared_tx . clone ( ) ,
} ) ;
}
/// This handles a message as a websocket event and updates its events along with the events' observers
pub async fn handle_event ( & mut self , msg : tokio_tungstenite ::tungstenite ::Message ) {
if msg . to_string ( ) = = String ::new ( ) {
return ;
}
2023-05-13 21:27:44 +02:00
let msg_string = msg . to_string ( ) ;
2023-05-14 08:20:25 +02:00
println! ( " {} " , & msg_string ) ;
2023-05-13 21:27:44 +02:00
let gateway_payload : GatewayPayload = serde_json ::from_str ( & msg_string ) . unwrap ( ) ;
2023-05-11 22:47:31 +02:00
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes
match gateway_payload . op {
// Dispatch
// An event was dispatched, we need to look at the gateway event name t
0 = > {
let gateway_payload_t = gateway_payload . t . unwrap ( ) ;
println! ( " GW: Received {} .. " , gateway_payload_t ) ;
// See https://discord.com/developers/docs/topics/gateway-events#receive-events
match gateway_payload_t . as_str ( ) {
" READY " = > {
2023-05-14 08:20:25 +02:00
let data : GatewayReady = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
println! ( " {:?} " , data ) ;
2023-05-11 22:47:31 +02:00
}
2023-05-13 16:24:34 +02:00
" RESUMED " = > { }
2023-05-11 22:47:31 +02:00
" APPLICATION_COMMAND_PERMISSIONS_UPDATE " = > { }
" AUTO_MODERATION_RULE_CREATE " = > { }
" AUTO_MODERATION_RULE_UPDATE " = > { }
" AUTO_MODERATION_RULE_DELETE " = > { }
" AUTO_MODERATION_ACTION_EXECUTION " = > { }
2023-05-13 21:27:44 +02:00
" CHANNEL_CREATE " = > {
let channel : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ChannelCreate { channel } ;
self . events . lock ( ) . await . channel . create . update_data ( new_data ) . await ;
}
" CHANNEL_UPDATE " = > {
let channel : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ChannelUpdate { channel } ;
self . events . lock ( ) . await . channel . update . update_data ( new_data ) . await ;
}
" CHANNEL_DELETE " = > {
let channel : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ChannelDelete { channel } ;
self . events . lock ( ) . await . channel . delete . update_data ( new_data ) . await ;
}
" CHANNEL_PINS_UPDATE " = > {
let new_data : ChannelPinsUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . channel . pins_update . update_data ( new_data ) . await ;
}
2023-05-14 08:20:25 +02:00
" CALL_CREATE " = > {
let new_data : CallCreate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . call . create . update_data ( new_data ) . await ;
}
2023-05-13 21:27:44 +02:00
" THREAD_CREATE " = > {
let thread : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ThreadCreate { thread } ;
self . events . lock ( ) . await . thread . create . update_data ( new_data ) . await ;
}
" THREAD_UPDATE " = > {
let thread : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ThreadUpdate { thread } ;
self . events . lock ( ) . await . thread . update . update_data ( new_data ) . await ;
}
" THREAD_DELETE " = > {
let thread : Channel = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = ThreadDelete { thread } ;
self . events . lock ( ) . await . thread . delete . update_data ( new_data ) . await ;
}
" THREAD_LIST_SYNC " = > {
let new_data : ThreadListSync = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . thread . list_sync . update_data ( new_data ) . await ;
}
" THREAD_MEMBER_UPDATE " = > {
let new_data : ThreadMemberUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . thread . member_update . update_data ( new_data ) . await ;
}
" THREAD_MEMBERS_UPDATE " = > {
let new_data : ThreadMembersUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . thread . members_update . update_data ( new_data ) . await ;
}
" GUILD_CREATE " = > {
let new_data : GuildCreate = serde_json ::from_str ( & msg_string ) . unwrap ( ) ;
self . events . lock ( ) . await . guild . create . update_data ( new_data ) . await ;
}
2023-05-14 08:20:25 +02:00
" GUILD_UPDATE " = > {
let guild : Guild = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = GuildUpdate { guild } ;
self . events . lock ( ) . await . guild . update . update_data ( new_data ) . await ;
}
2023-05-13 16:24:34 +02:00
" GUILD_DELETE " = > {
2023-05-14 08:20:25 +02:00
let guild : UnavailableGuild = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = GuildDelete { guild } ;
self . events . lock ( ) . await . guild . delete . update_data ( new_data ) . await ;
2023-05-13 16:24:34 +02:00
}
2023-05-11 22:47:31 +02:00
" GUILD_AUDIT_LOG_ENTRY_CREATE " = > { }
2023-05-13 16:24:34 +02:00
" GUILD_BAN_ADD " = > {
2023-05-14 08:20:25 +02:00
let new_data : GuildBanAdd = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . guild . ban_add . update_data ( new_data ) . await ;
2023-05-13 16:24:34 +02:00
}
" GUILD_BAN_REMOVE " = > {
2023-05-14 08:20:25 +02:00
let new_data : GuildBanRemove = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . guild . ban_remove . update_data ( new_data ) . await ;
}
" GUILD_EMOJIS_UPDATE " = > {
let new_data : GuildEmojisUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . guild . emojis_update . update_data ( new_data ) . await ;
2023-05-13 16:24:34 +02:00
}
2023-05-11 22:47:31 +02:00
" GUILD_STICKERS_UPDATE " = > { }
" GUILD_INTEGRATIONS_UPDATE " = > { }
" GUILD_MEMBER_ADD " = > { }
" GUILD_MEMBER_REMOVE " = > { }
" GUILD_MEMBER_UPDATE " = > { }
" GUILD_MEMBERS_CHUNK " = > { }
" GUILD_ROLE_CREATE " = > { }
" GUILD_ROLE_UPDATE " = > { }
" GUILD_ROLE_DELETE " = > { }
" GUILD_SCHEDULED_EVENT_CREATE " = > { }
" GUILD_SCHEDULED_EVENT_UPDATE " = > { }
" GUILD_SCHEDULED_EVENT_DELETE " = > { }
" GUILD_SCHEDULED_EVENT_USER_ADD " = > { }
" GUILD_SCHEDULED_EVENT_USER_REMOVE " = > { }
" INTEGRATION_CREATE " = > { }
" INTEGRATION_UPDATE " = > { }
" INTEGRATION_DELETE " = > { }
" INTERACTION_CREATE " = > { }
" INVITE_CREATE " = > { }
" INVITE_DELETE " = > { }
" MESSAGE_CREATE " = > {
let new_data : MessageCreate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . create . update_data ( new_data ) . await ;
}
" MESSAGE_UPDATE " = > {
let new_data : MessageUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . update . update_data ( new_data ) . await ;
}
" MESSAGE_DELETE " = > {
let new_data : MessageDelete = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . delete . update_data ( new_data ) . await ;
}
" MESSAGE_DELETE_BULK " = > {
let new_data : MessageDeleteBulk = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . delete_bulk . update_data ( new_data ) . await ;
}
" MESSAGE_REACTION_ADD " = > {
let new_data : MessageReactionAdd = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . reaction_add . update_data ( new_data ) . await ;
}
" MESSAGE_REACTION_REMOVE " = > {
let new_data : MessageReactionRemove = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . reaction_remove . update_data ( new_data ) . await ;
}
" MESSAGE_REACTION_REMOVE_ALL " = > {
let new_data : MessageReactionRemoveAll = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . reaction_remove_all . update_data ( new_data ) . await ;
}
" MESSAGE_REACTION_REMOVE_EMOJI " = > {
let new_data : MessageReactionRemoveEmoji = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . message . reaction_remove_emoji . update_data ( new_data ) . await ;
}
" PRESENCE_UPDATE " = > {
let new_data : PresenceUpdate = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . user . presence_update . update_data ( new_data ) . await ;
}
" STAGE_INSTANCE_CREATE " = > { }
" STAGE_INSTANCE_UPDATE " = > { }
" STAGE_INSTANCE_DELETE " = > { }
// Not documented in discord docs, I assume this isnt for bots / apps but is for users?
" SESSIONS_REPLACE " = > { }
" TYPING_START " = > {
let new_data : TypingStartEvent = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
self . events . lock ( ) . await . user . typing_start_event . update_data ( new_data ) . await ;
}
2023-05-13 16:24:34 +02:00
" USER_UPDATE " = > {
let user : UserObject = serde_json ::from_value ( gateway_payload . d . unwrap ( ) ) . unwrap ( ) ;
let new_data = UserUpdate { user } ;
2023-05-13 21:27:44 +02:00
self . events . lock ( ) . await . user . update . update_data ( new_data ) . await ;
2023-05-13 16:24:34 +02:00
}
2023-05-11 22:47:31 +02:00
" VOICE_STATE_UPDATE " = > { }
" VOICE_SERVER_UPDATE " = > { }
" WEBHOOKS_UPDATE " = > { }
_ = > { panic! ( " Invalid gateway event ( {} ) " , & gateway_payload_t ) }
}
}
// Heartbeat
// We received a heartbeat from the server
1 = > { }
// Reconnect
7 = > { todo! ( ) }
// Invalid Session
9 = > { todo! ( ) }
// Hello
// Starts our heartbeat
2023-05-13 09:47:12 +02:00
// We should have already handled this in gateway init
2023-05-11 22:47:31 +02:00
10 = > {
2023-05-13 09:47:12 +02:00
panic! ( " Recieved hello when it was unexpected " ) ;
2023-05-11 22:47:31 +02:00
}
// Heartbeat ACK
11 = > {
println! ( " GW: Received Heartbeat ACK " ) ;
}
2 | 3 | 4 | 6 | 8 = > { panic! ( " Received Gateway op code that's meant to be sent, not received ( {} ) " , gateway_payload . op ) }
_ = > { panic! ( " Received Invalid Gateway op code ( {} ) " , gateway_payload . op ) }
}
// If we have an active heartbeat thread and we received a seq number we should let it know
if gateway_payload . s . is_some ( ) {
if self . heartbeat_handler . is_some ( ) {
let heartbeat_communication = HeartbeatThreadCommunication { op : gateway_payload . op , d : gateway_payload . s . unwrap ( ) } ;
self . heartbeat_handler . as_mut ( ) . unwrap ( ) . tx . send ( heartbeat_communication ) . await . unwrap ( ) ;
}
}
}
}
2023-05-05 18:38:04 +02:00
/**
Handles sending heartbeats to the gateway in another thread
* /
struct HeartbeatHandler {
/// The heartbeat interval in milliseconds
heartbeat_interval : u128 ,
tx : Sender < HeartbeatThreadCommunication > ,
}
impl HeartbeatHandler {
2023-05-07 11:58:12 +02:00
pub fn new ( heartbeat_interval : u128 , websocket_tx : Arc < Mutex < SplitSink < WebSocketStream < MaybeTlsStream < TcpStream > > , tokio_tungstenite ::tungstenite ::Message > > > ) -> HeartbeatHandler {
let ( tx , mut rx ) = mpsc ::channel ( 32 ) ;
2023-05-05 18:38:04 +02:00
task ::spawn ( async move {
let mut last_heartbeat : Instant = time ::Instant ::now ( ) ;
let mut last_seq_number : Option < u64 > = None ;
loop {
// If we received a seq number update, use that as the last seq number
2023-05-07 11:58:12 +02:00
let hb_communication : Result < HeartbeatThreadCommunication , TryRecvError > = rx . try_recv ( ) ;
if hb_communication . is_ok ( ) {
2023-05-05 18:38:04 +02:00
last_seq_number = Some ( hb_communication . unwrap ( ) . d ) ;
}
if last_heartbeat . elapsed ( ) . as_millis ( ) > heartbeat_interval {
2023-05-07 12:44:11 +02:00
println! ( " GW: Sending Heartbeat.. " ) ;
2023-05-06 11:14:38 +02:00
2023-05-05 18:38:04 +02:00
let heartbeat = GatewayHeartbeat {
op : 1 ,
d : last_seq_number
} ;
let heartbeat_json = serde_json ::to_string ( & heartbeat ) . unwrap ( ) ;
let msg = tokio_tungstenite ::tungstenite ::Message ::text ( heartbeat_json ) ;
2023-05-13 09:47:12 +02:00
websocket_tx . lock ( ) . await
2023-05-05 18:38:04 +02:00
. send ( msg )
. await
. unwrap ( ) ;
last_heartbeat = time ::Instant ::now ( ) ;
}
}
} ) ;
Self { heartbeat_interval , tx }
}
}
/**
Used to communicate with the main thread .
Either signifies a sequence number update or a received heartbeat ack
* /
#[ derive(Clone, Copy, Debug) ]
struct HeartbeatThreadCommunication {
/// An opcode for the communication we received
op : u8 ,
/// The sequence number we got from discord
d : u64
}
2023-04-28 13:40:29 +02:00
/**
Trait which defines the behaviour of an Observer . An Observer is an object which is subscribed to
an Observable . The Observer is notified when the Observable ' s data changes .
2023-04-28 18:18:32 +02:00
In this case , the Observable is a [ ` GatewayEvent ` ] , which is a wrapper around a WebSocketEvent .
2023-04-28 13:40:29 +02:00
* /
2023-05-11 22:47:31 +02:00
pub trait Observer < T : WebSocketEvent > : std ::fmt ::Debug {
2023-04-28 12:31:59 +02:00
fn update ( & self , data : & T ) ;
2023-04-27 17:57:10 +02:00
}
2023-04-28 13:40:29 +02:00
/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
2023-04-28 18:18:32 +02:00
change in the WebSocketEvent . GatewayEvents are observable .
* /
2023-05-11 22:47:31 +02:00
#[ derive(Default, Debug) ]
pub struct GatewayEvent < T : WebSocketEvent > {
observers : Vec < Arc < Mutex < dyn Observer < T > + Sync + Send > > > ,
2023-04-28 12:31:59 +02:00
pub event_data : T ,
2023-04-27 22:29:07 +02:00
pub is_observed : bool ,
2023-04-27 17:57:10 +02:00
}
2023-05-11 22:47:31 +02:00
impl < T : WebSocketEvent > GatewayEvent < T > {
2023-04-28 12:31:59 +02:00
fn new ( event_data : T ) -> Self {
2023-04-27 17:57:10 +02:00
Self {
2023-04-27 22:29:07 +02:00
is_observed : false ,
2023-04-27 17:57:10 +02:00
observers : Vec ::new ( ) ,
2023-04-28 12:31:59 +02:00
event_data ,
2023-04-27 17:57:10 +02:00
}
}
2023-04-28 13:40:29 +02:00
/**
Returns true if the GatewayEvent is observed by at least one Observer .
2023-04-28 18:18:32 +02:00
* /
2023-04-27 22:29:07 +02:00
pub fn is_observed ( & self ) -> bool {
self . is_observed
}
2023-04-28 13:40:29 +02:00
/**
Subscribes an Observer to the GatewayEvent . Returns an error if the GatewayEvent is already
observed .
# Errors
Returns an error if the GatewayEvent is already observed .
Error type : [ ` ObserverError ::AlreadySubscribedError ` ]
2023-04-28 18:18:32 +02:00
* /
2023-05-11 22:47:31 +02:00
pub fn subscribe ( & mut self , observable : Arc < Mutex < dyn Observer < T > + Sync + Send > > ) -> Option < ObserverError > {
2023-04-27 22:29:07 +02:00
if self . is_observed {
2023-04-27 22:38:41 +02:00
return Some ( ObserverError ::AlreadySubscribedError ) ;
2023-04-27 22:29:07 +02:00
}
self . is_observed = true ;
2023-04-27 22:38:41 +02:00
self . observers . push ( observable ) ;
None
2023-04-27 17:57:10 +02:00
}
2023-04-28 13:40:29 +02:00
/**
Unsubscribes an Observer from the GatewayEvent .
2023-04-28 18:18:32 +02:00
* /
2023-05-11 22:47:31 +02:00
pub fn unsubscribe ( & mut self , observable : Arc < Mutex < dyn Observer < T > + Sync + Send > > ) {
2023-04-28 12:31:59 +02:00
// .retain()'s closure retains only those elements of the vector, which have a different
// pointer value than observable.
2023-05-11 22:47:31 +02:00
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
// anddd there is no way to do that without using format
self . observers . retain ( | obs | ! ( format! ( " {:?} " , obs ) = = format! ( " {:?} " , & observable ) ) ) ;
2023-04-28 12:31:59 +02:00
self . is_observed = ! self . observers . is_empty ( ) ;
2023-04-27 17:57:10 +02:00
}
2023-04-28 13:40:29 +02:00
/**
Updates the GatewayEvent ' s data and notifies the observers .
2023-04-28 18:18:32 +02:00
* /
2023-05-11 22:47:31 +02:00
async fn update_data ( & mut self , new_event_data : T ) {
2023-04-28 12:31:59 +02:00
self . event_data = new_event_data ;
2023-05-11 22:47:31 +02:00
self . notify ( ) . await ;
2023-04-27 17:57:10 +02:00
}
2023-04-28 13:40:29 +02:00
/**
Notifies the observers of the GatewayEvent .
2023-04-28 18:18:32 +02:00
* /
2023-05-11 22:47:31 +02:00
async fn notify ( & self ) {
2023-04-27 17:57:10 +02:00
for observer in & self . observers {
2023-05-11 22:47:31 +02:00
observer . lock ( ) . await . update ( & self . event_data ) ;
2023-04-28 12:31:59 +02:00
}
}
}
2023-04-28 18:18:32 +02:00
mod events {
use super ::* ;
2023-05-11 22:47:31 +02:00
#[ derive(Default, Debug) ]
pub struct Events {
pub message : Message ,
pub user : User ,
2023-05-13 21:27:44 +02:00
pub channel : Channel ,
pub thread : Thread ,
pub guild : Guild ,
2023-05-14 08:20:25 +02:00
pub call : Call ,
2023-05-11 22:47:31 +02:00
pub gateway_identify_payload : GatewayEvent < GatewayIdentifyPayload > ,
pub gateway_resume : GatewayEvent < GatewayResume > ,
2023-04-28 18:18:32 +02:00
}
2023-05-11 22:47:31 +02:00
#[ derive(Default, Debug) ]
pub struct Message {
pub create : GatewayEvent < MessageCreate > ,
pub update : GatewayEvent < MessageUpdate > ,
pub delete : GatewayEvent < MessageDelete > ,
pub delete_bulk : GatewayEvent < MessageDeleteBulk > ,
pub reaction_add : GatewayEvent < MessageReactionAdd > ,
pub reaction_remove : GatewayEvent < MessageReactionRemove > ,
pub reaction_remove_all : GatewayEvent < MessageReactionRemoveAll > ,
pub reaction_remove_emoji : GatewayEvent < MessageReactionRemoveEmoji > ,
2023-04-28 18:18:32 +02:00
}
2023-05-11 22:47:31 +02:00
#[ derive(Default, Debug) ]
pub struct User {
2023-05-13 21:27:44 +02:00
pub update : GatewayEvent < UserUpdate > ,
2023-05-11 22:47:31 +02:00
pub presence_update : GatewayEvent < PresenceUpdate > ,
pub typing_start_event : GatewayEvent < TypingStartEvent > ,
2023-04-28 18:18:32 +02:00
}
2023-05-13 21:27:44 +02:00
#[ derive(Default, Debug) ]
pub struct Channel {
pub create : GatewayEvent < ChannelCreate > ,
pub update : GatewayEvent < ChannelUpdate > ,
pub delete : GatewayEvent < ChannelDelete > ,
pub pins_update : GatewayEvent < ChannelPinsUpdate >
}
#[ derive(Default, Debug) ]
pub struct Thread {
pub create : GatewayEvent < ThreadCreate > ,
pub update : GatewayEvent < ThreadUpdate > ,
pub delete : GatewayEvent < ThreadDelete > ,
pub list_sync : GatewayEvent < ThreadListSync > ,
pub member_update : GatewayEvent < ThreadMemberUpdate > ,
pub members_update : GatewayEvent < ThreadMembersUpdate > ,
}
#[ derive(Default, Debug) ]
pub struct Guild {
pub create : GatewayEvent < GuildCreate > ,
2023-05-14 08:20:25 +02:00
pub update : GatewayEvent < GuildUpdate > ,
pub delete : GatewayEvent < GuildDelete > ,
//pub audit_log_entry_create: GatewayEvent<ThreadCreate>,
pub ban_add : GatewayEvent < GuildBanAdd > ,
pub ban_remove : GatewayEvent < GuildBanRemove > ,
pub emojis_update : GatewayEvent < GuildEmojisUpdate > ,
/* pub stickers_update: GatewayEvent<ThreadCreate>,
2023-05-13 21:27:44 +02:00
pub integrations_update : GatewayEvent < ThreadCreate > ,
pub member_add : GatewayEvent < ThreadCreate > ,
pub member_remove : GatewayEvent < ThreadCreate > ,
pub member_update : GatewayEvent < ThreadCreate > ,
pub members_chunk : GatewayEvent < ThreadCreate > ,
pub role_create : GatewayEvent < ThreadCreate > ,
pub role_update : GatewayEvent < ThreadCreate > ,
pub role_delete : GatewayEvent < ThreadCreate > ,
pub role_scheduled_event_create : GatewayEvent < ThreadCreate > ,
pub role_scheduled_event_update : GatewayEvent < ThreadCreate > ,
pub role_scheduled_event_delete : GatewayEvent < ThreadCreate > ,
pub role_scheduled_event_user_add : GatewayEvent < ThreadCreate > ,
pub role_scheduled_event_user_remove : GatewayEvent < ThreadCreate > , * /
}
2023-05-14 08:20:25 +02:00
#[ derive(Default, Debug) ]
pub struct Call {
pub create : GatewayEvent < CallCreate >
}
2023-04-28 18:18:32 +02:00
}
2023-04-28 12:31:59 +02:00
#[ cfg(test) ]
2023-04-28 12:39:58 +02:00
mod example {
2023-04-28 12:31:59 +02:00
use super ::* ;
use crate ::api ::types ::GatewayResume ;
2023-05-11 22:47:31 +02:00
#[ derive(Debug) ]
2023-04-28 12:31:59 +02:00
struct Consumer ;
impl Observer < GatewayResume > for Consumer {
fn update ( & self , data : & GatewayResume ) {
println! ( " {} " , data . token )
}
}
2023-05-11 22:47:31 +02:00
#[ tokio::test ]
async fn test_observer_behaviour ( ) {
2023-04-28 12:31:59 +02:00
let mut event = GatewayEvent ::new ( GatewayResume {
token : " start " . to_string ( ) ,
session_id : " start " . to_string ( ) ,
seq : " start " . to_string ( ) ,
} ) ;
let new_data = GatewayResume {
token : " token_3276ha37am3 " . to_string ( ) ,
session_id : " 89346671230 " . to_string ( ) ,
seq : " 3 " . to_string ( ) ,
} ;
let consumer = Consumer ;
2023-05-13 15:36:29 +02:00
let arc_mut_consumer = Arc ::new ( Mutex ::new ( consumer ) ) ;
2023-04-28 12:31:59 +02:00
2023-05-13 15:36:29 +02:00
event . subscribe ( arc_mut_consumer . clone ( ) ) ;
2023-04-28 12:31:59 +02:00
2023-05-11 22:47:31 +02:00
event . notify ( ) . await ;
2023-04-28 12:31:59 +02:00
2023-05-11 22:47:31 +02:00
event . update_data ( new_data ) . await ;
2023-04-28 12:31:59 +02:00
let second_consumer = Consumer ;
2023-05-13 15:36:29 +02:00
let arc_mut_second_consumer = Arc ::new ( Mutex ::new ( second_consumer ) ) ;
2023-04-28 12:31:59 +02:00
2023-05-13 15:36:29 +02:00
match event . subscribe ( arc_mut_second_consumer . clone ( ) ) {
2023-04-28 12:39:58 +02:00
None = > assert! ( false ) ,
2023-04-28 12:31:59 +02:00
Some ( err ) = > println! ( " You cannot subscribe twice: {} " , err ) ,
2023-04-27 17:57:10 +02:00
}
2023-05-13 15:36:29 +02:00
event . unsubscribe ( arc_mut_consumer . clone ( ) ) ;
match event . subscribe ( arc_mut_second_consumer . clone ( ) ) {
None = > assert! ( true ) ,
2023-05-13 16:35:42 +02:00
Some ( _ ) = > assert! ( false ) ,
2023-05-13 15:36:29 +02:00
}
2023-04-27 17:57:10 +02:00
}
2023-04-28 23:21:55 +02:00
#[ tokio::test ]
2023-05-13 15:36:29 +02:00
async fn test_gateway_establish ( ) {
2023-05-09 20:35:53 +02:00
let _gateway = Gateway ::new ( " ws://localhost:3001/ " . to_string ( ) )
2023-04-28 23:21:55 +02:00
. await
. unwrap ( ) ;
}
2023-04-25 17:21:27 +02:00
}