Gateway opcode constants and small refactor

This commit is contained in:
kozabrada123 2023-05-28 14:39:41 +02:00
parent d233b82243
commit da27692b53
2 changed files with 214 additions and 150 deletions

View File

@ -5,6 +5,7 @@ use futures_util::stream::SplitSink;
use futures_util::SinkExt; use futures_util::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
use native_tls::TlsConnector; use native_tls::TlsConnector;
use tokio::task::JoinHandle;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -17,6 +18,54 @@ use tokio::time::Instant;
use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream}; use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream};
// Gateway opcodes
/// Opcode received when the server dispatches a [crate::types::WebSocketEvent]
const GATEWAY_DISPATCH: u8 = 0;
/// Opcode sent when sending a heartbeat
const GATEWAY_HEARTBEAT: u8 = 1;
/// Opcode sent to initiate a session
///
/// See [types::GatewayIdentifyPayload]
const GATEWAY_IDENTIFY: u8 = 2;
/// Opcode sent to update our presence
///
/// See [types::GatewayUpdatePresence]
const GATEWAY_UPDATE_PRESENCE: u8 = 3;
/// Opcode sent to update our state in vc
///
/// Like muting, deafening, leaving, joining..
///
/// See [types::UpdateVoiceState]
const GATEWAY_UPDATE_VOICE_STATE: u8 = 4;
/// Opcode sent to resume a session
///
/// See [types::GatewayResume]
const GATEWAY_RESUME: u8 = 6;
/// Opcode received to tell the client to reconnect
const GATEWAY_RECONNECT: u8 = 7;
/// Opcode sent to request guild member data
///
/// See [types::GatewayRequestGuildMembers]
const GATEWAY_REQUEST_GUILD_MEMBERS: u8 = 8;
/// Opcode received to tell the client their token / session is invalid
const GATEWAY_INVALID_SESSION: u8 = 9;
/// Opcode received when initially connecting to the gateway, starts our heartbeat
///
/// See [types::HelloData]
const GATEWAY_HELLO: u8 = 10;
/// Opcode received to acknowledge a heartbeat
const GATEWAY_HEARTBEAT_ACK: u8 = 11;
/// Opcode sent to get the voice state of users in a given DM/group channel
///
/// See [types::CallSync]
const GATEWAY_CALL_SYNC: u8 = 13;
/// Opcode sent to get data for a server (Lazy Loading request)
///
/// Sent by the official client when switching to a server
///
/// See [types::LazyRequest]
const GATEWAY_LAZY_REQUEST: u8 = 14;
#[derive(Debug)] #[derive(Debug)]
/** /**
Represents a handle to a Gateway connection. A Gateway connection will create observable Represents a handle to a Gateway connection. A Gateway connection will create observable
@ -27,7 +76,7 @@ Using this handle you can also send Gateway Events directly.
pub struct GatewayHandle { pub struct GatewayHandle {
pub url: String, pub url: String,
pub events: Arc<Mutex<Events>>, pub events: Arc<Mutex<Events>>,
pub websocket_tx: Arc< pub websocket_send: Arc<
Mutex< Mutex<
SplitSink< SplitSink<
WebSocketStream<MaybeTlsStream<TcpStream>>, WebSocketStream<MaybeTlsStream<TcpStream>>,
@ -35,23 +84,24 @@ pub struct GatewayHandle {
>, >,
>, >,
>, >,
pub handle: JoinHandle<()>
} }
impl GatewayHandle { impl GatewayHandle {
/// Sends json to the gateway with an opcode /// Sends json to the gateway with an opcode
async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { async fn send_json_event(&self, op_code: u8, to_send: serde_json::Value) {
let gateway_payload = types::GatewaySendPayload { let gateway_payload = types::GatewaySendPayload {
op, op_code,
d: Some(to_send), event_data: Some(to_send),
s: None sequence_number: None
}; };
let payload_json = serde_json::to_string(&gateway_payload).unwrap(); let payload_json = serde_json::to_string(&gateway_payload).unwrap();
let message = tokio_tungstenite::tungstenite::Message::text(payload_json); let message = tokio_tungstenite::tungstenite::Message::text(payload_json);
self.websocket_tx.lock().await.send(message).await.unwrap(); self.websocket_send.lock().await.send(message).await.unwrap();
} }
/// Sends an identify event to the gateway /// Sends an identify event to the gateway
@ -60,7 +110,7 @@ impl GatewayHandle {
println!("GW: Sending Identify.."); println!("GW: Sending Identify..");
self.send_json_event(2, to_send_value).await; self.send_json_event(GATEWAY_IDENTIFY, to_send_value).await;
} }
/// Sends a resume event to the gateway /// Sends a resume event to the gateway
@ -69,7 +119,7 @@ impl GatewayHandle {
println!("GW: Sending Resume.."); println!("GW: Sending Resume..");
self.send_json_event(6, to_send_value).await; self.send_json_event(GATEWAY_RESUME, to_send_value).await;
} }
/// Sends an update presence event to the gateway /// Sends an update presence event to the gateway
@ -78,7 +128,7 @@ impl GatewayHandle {
println!("GW: Sending Presence Update.."); println!("GW: Sending Presence Update..");
self.send_json_event(3, to_send_value).await; self.send_json_event(GATEWAY_UPDATE_PRESENCE, to_send_value).await;
} }
/// Sends a request guild members to the server /// Sends a request guild members to the server
@ -87,7 +137,7 @@ impl GatewayHandle {
println!("GW: Sending Request Guild Members.."); println!("GW: Sending Request Guild Members..");
self.send_json_event(8, to_send_value).await; self.send_json_event(GATEWAY_REQUEST_GUILD_MEMBERS, to_send_value).await;
} }
/// Sends an update voice state to the server /// Sends an update voice state to the server
@ -97,7 +147,7 @@ impl GatewayHandle {
println!("GW: Sending Update Voice State.."); println!("GW: Sending Update Voice State..");
self.send_json_event(4, to_send_value).await; self.send_json_event(GATEWAY_UPDATE_VOICE_STATE, to_send_value).await;
} }
/// Sends a call sync to the server /// Sends a call sync to the server
@ -107,7 +157,7 @@ impl GatewayHandle {
println!("GW: Sending Call Sync.."); println!("GW: Sending Call Sync..");
self.send_json_event(13, to_send_value).await; self.send_json_event(GATEWAY_CALL_SYNC, to_send_value).await;
} }
/// Sends a Lazy Request /// Sends a Lazy Request
@ -117,14 +167,14 @@ impl GatewayHandle {
println!("GW: Sending Lazy Request.."); println!("GW: Sending Lazy Request..");
self.send_json_event(14, to_send_value).await; self.send_json_event(GATEWAY_LAZY_REQUEST, to_send_value).await;
} }
} }
pub struct Gateway { pub struct Gateway {
pub events: Arc<Mutex<Events>>, pub events: Arc<Mutex<Events>>,
heartbeat_handler: Option<HeartbeatHandler>, heartbeat_handler: Option<HeartbeatHandler>,
pub websocket_tx: Arc< pub websocket_send: Arc<
Mutex< Mutex<
SplitSink< SplitSink<
WebSocketStream<MaybeTlsStream<TcpStream>>, WebSocketStream<MaybeTlsStream<TcpStream>>,
@ -138,7 +188,7 @@ impl Gateway {
pub async fn new( pub async fn new(
websocket_url: String, websocket_url: String,
) -> Result<GatewayHandle, tokio_tungstenite::tungstenite::Error> { ) -> Result<GatewayHandle, tokio_tungstenite::tungstenite::Error> {
let (ws_stream, _) = match connect_async_tls_with_config( let (websocket_stream, _) = match connect_async_tls_with_config(
&websocket_url, &websocket_url,
None, None,
false, false,
@ -148,45 +198,45 @@ impl Gateway {
) )
.await .await
{ {
Ok(ws_stream) => ws_stream, Ok(websocket_stream) => websocket_stream,
Err(e) => return Err(e), Err(e) => return Err(e),
}; };
let (ws_tx, mut ws_rx) = ws_stream.split(); let (gateway_send, mut gateway_receive) = websocket_stream.split();
let shared_tx = Arc::new(Mutex::new(ws_tx)); let shared_gateway_send = Arc::new(Mutex::new(gateway_send));
let mut gateway = Gateway { let mut gateway = Gateway {
events: Arc::new(Mutex::new(Events::default())), events: Arc::new(Mutex::new(Events::default())),
heartbeat_handler: None, heartbeat_handler: None,
websocket_tx: shared_tx.clone(), websocket_send: shared_gateway_send.clone(),
}; };
let shared_events = gateway.events.clone(); let shared_events = gateway.events.clone();
// Wait for the first hello and then spawn both tasks so we avoid nested tasks // Wait for the first hello and then spawn both tasks so we avoid nested tasks
// This automatically spawns the heartbeat task, but from the main thread // This automatically spawns the heartbeat task, but from the main thread
let msg = ws_rx.next().await.unwrap().unwrap(); let msg = gateway_receive.next().await.unwrap().unwrap();
let gateway_payload: types::GatewayReceivePayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); let gateway_payload: types::GatewayReceivePayload = serde_json::from_str(msg.to_text().unwrap()).unwrap();
if gateway_payload.op != 10 { if gateway_payload.op_code != GATEWAY_HELLO {
println!("Recieved non hello on gateway init, what is happening?"); println!("Received non hello on gateway init, what is happening?");
return Err(tokio_tungstenite::tungstenite::Error::Protocol( return Err(tokio_tungstenite::tungstenite::Error::Protocol(
tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode( tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode(
gateway_payload.op, gateway_payload.op_code,
), ),
)); ));
} }
println!("GW: Received Hello"); println!("GW: Received Hello");
let gateway_hello: types::HelloData = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let gateway_hello: types::HelloData = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
gateway.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, shared_tx.clone())); gateway.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, shared_gateway_send.clone()));
// Now we can continously check for messages in a different task, since we aren't going to receive another hello // Now we can continuously check for messages in a different task, since we aren't going to receive another hello
task::spawn(async move { let handle: JoinHandle<()> = task::spawn(async move {
loop { loop {
let msg = ws_rx.next().await; let msg = gateway_receive.next().await;
if msg.as_ref().is_some() { if msg.as_ref().is_some() {
let msg_unwrapped = msg.unwrap().unwrap(); let msg_unwrapped = msg.unwrap().unwrap();
gateway.handle_event(msg_unwrapped).await; gateway.handle_event(msg_unwrapped).await;
@ -197,7 +247,8 @@ impl Gateway {
return Ok(GatewayHandle { return Ok(GatewayHandle {
url: websocket_url.clone(), url: websocket_url.clone(),
events: shared_events, events: shared_events,
websocket_tx: shared_tx.clone(), websocket_send: shared_gateway_send.clone(),
handle,
}); });
} }
@ -210,347 +261,341 @@ impl Gateway {
let gateway_payload: types::GatewayReceivePayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); let gateway_payload: types::GatewayReceivePayload = serde_json::from_str(msg.to_text().unwrap()).unwrap();
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes
match gateway_payload.op { match gateway_payload.op_code {
// Dispatch
// An event was dispatched, we need to look at the gateway event name t // An event was dispatched, we need to look at the gateway event name t
0 => { GATEWAY_DISPATCH => {
let gateway_payload_t = gateway_payload.clone().t.unwrap(); let gateway_payload_t = gateway_payload.clone().event_name.unwrap();
println!("GW: Received {}..", gateway_payload_t); println!("GW: Received {}..", gateway_payload_t);
//println!("Event data dump: {}", gateway_payload.d.clone().unwrap().get()); //println!("Event data dump: {}", gateway_payload.d.clone().unwrap().get());
// See https://discord.com/developers/docs/topics/gateway-events#receive-events // See https://discord.com/developers/docs/topics/gateway-events#receive-events
// "Some" of these are uncodumented // "Some" of these are undocumented
match gateway_payload_t.as_str() { match gateway_payload_t.as_str() {
"READY" => { "READY" => {
let new_data: types::GatewayReady = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GatewayReady = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.session.ready.update_data(new_data).await; self.events.lock().await.session.ready.update_data(new_data).await;
}, },
"READY_SUPPLEMENTAL" => { "READY_SUPPLEMENTAL" => {
let new_data: types::GatewayReadySupplemental = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GatewayReadySupplemental = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.session.ready_supplimental.update_data(new_data).await; self.events.lock().await.session.ready_supplemental.update_data(new_data).await;
} }
"RESUMED" => {} "RESUMED" => {}
"APPLICATION_COMMAND_PERMISSIONS_UPDATE" => { "APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {
let new_data: types::ApplicationCommandPermissionsUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ApplicationCommandPermissionsUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.application.command_permissions_update.update_data(new_data).await; self.events.lock().await.application.command_permissions_update.update_data(new_data).await;
} }
"AUTO_MODERATION_RULE_CREATE" => { "AUTO_MODERATION_RULE_CREATE" => {
let new_data: types::AutoModerationRuleCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::AutoModerationRuleCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.auto_moderation.rule_create.update_data(new_data).await; self.events.lock().await.auto_moderation.rule_create.update_data(new_data).await;
} }
"AUTO_MODERATION_RULE_UPDATE" => { "AUTO_MODERATION_RULE_UPDATE" => {
let new_data: types::AutoModerationRuleUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::AutoModerationRuleUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.auto_moderation.rule_update.update_data(new_data).await; self.events.lock().await.auto_moderation.rule_update.update_data(new_data).await;
} }
"AUTO_MODERATION_RULE_DELETE" => { "AUTO_MODERATION_RULE_DELETE" => {
let new_data: types::AutoModerationRuleDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::AutoModerationRuleDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.auto_moderation.rule_delete.update_data(new_data).await; self.events.lock().await.auto_moderation.rule_delete.update_data(new_data).await;
} }
"AUTO_MODERATION_ACTION_EXECUTION" => { "AUTO_MODERATION_ACTION_EXECUTION" => {
let new_data: types::AutoModerationActionExecution = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::AutoModerationActionExecution = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.auto_moderation.action_execution.update_data(new_data).await; self.events.lock().await.auto_moderation.action_execution.update_data(new_data).await;
} }
"CHANNEL_CREATE" => { "CHANNEL_CREATE" => {
let new_data: types::ChannelCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ChannelCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.channel.create.update_data(new_data).await; self.events.lock().await.channel.create.update_data(new_data).await;
} }
"CHANNEL_UPDATE" => { "CHANNEL_UPDATE" => {
let new_data: types::ChannelUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ChannelUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.channel.update.update_data(new_data).await; self.events.lock().await.channel.update.update_data(new_data).await;
} }
"CHANNEL_UNREAD_UPDATE" => { "CHANNEL_UNREAD_UPDATE" => {
let new_data: types::ChannelUnreadUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ChannelUnreadUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.channel.unread_update.update_data(new_data).await; self.events.lock().await.channel.unread_update.update_data(new_data).await;
} }
"CHANNEL_DELETE" => { "CHANNEL_DELETE" => {
let new_data: types::ChannelDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ChannelDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.channel.delete.update_data(new_data).await; self.events.lock().await.channel.delete.update_data(new_data).await;
} }
"CHANNEL_PINS_UPDATE" => { "CHANNEL_PINS_UPDATE" => {
let new_data: types::ChannelPinsUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ChannelPinsUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.channel.pins_update.update_data(new_data).await; self.events.lock().await.channel.pins_update.update_data(new_data).await;
} }
"CALL_CREATE" => { "CALL_CREATE" => {
let new_data: types::CallCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::CallCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.call.create.update_data(new_data).await; self.events.lock().await.call.create.update_data(new_data).await;
}, },
"CALL_UPDATE" => { "CALL_UPDATE" => {
let new_data: types::CallUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::CallUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.call.update.update_data(new_data).await; self.events.lock().await.call.update.update_data(new_data).await;
} }
"CALL_DELETE" => { "CALL_DELETE" => {
let new_data: types::CallDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::CallDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.call.delete.update_data(new_data).await; self.events.lock().await.call.delete.update_data(new_data).await;
} }
"THREAD_CREATE" => { "THREAD_CREATE" => {
let new_data: types::ThreadCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.create.update_data(new_data).await; self.events.lock().await.thread.create.update_data(new_data).await;
} }
"THREAD_UPDATE" => { "THREAD_UPDATE" => {
let new_data: types::ThreadUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.update.update_data(new_data).await; self.events.lock().await.thread.update.update_data(new_data).await;
} }
"THREAD_DELETE" => { "THREAD_DELETE" => {
let new_data: types::ThreadDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.delete.update_data(new_data).await; self.events.lock().await.thread.delete.update_data(new_data).await;
} }
"THREAD_LIST_SYNC" => { "THREAD_LIST_SYNC" => {
let new_data: types::ThreadListSync = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadListSync = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.list_sync.update_data(new_data).await; self.events.lock().await.thread.list_sync.update_data(new_data).await;
} }
"THREAD_MEMBER_UPDATE" => { "THREAD_MEMBER_UPDATE" => {
let new_data: types::ThreadMemberUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadMemberUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.member_update.update_data(new_data).await; self.events.lock().await.thread.member_update.update_data(new_data).await;
} }
"THREAD_MEMBERS_UPDATE" => { "THREAD_MEMBERS_UPDATE" => {
let new_data: types::ThreadMembersUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::ThreadMembersUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.thread.members_update.update_data(new_data).await; self.events.lock().await.thread.members_update.update_data(new_data).await;
} }
"GUILD_CREATE" => { "GUILD_CREATE" => {
let new_data: types::GuildCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.create.update_data(new_data).await; self.events.lock().await.guild.create.update_data(new_data).await;
} }
"GUILD_UPDATE" => { "GUILD_UPDATE" => {
let new_data: types::GuildUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.update.update_data(new_data).await; self.events.lock().await.guild.update.update_data(new_data).await;
} }
"GUILD_DELETE" => { "GUILD_DELETE" => {
let new_data: types::GuildDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.delete.update_data(new_data).await; self.events.lock().await.guild.delete.update_data(new_data).await;
} }
"GUILD_AUDIT_LOG_ENTRY_CREATE" => { "GUILD_AUDIT_LOG_ENTRY_CREATE" => {
let new_data: types::GuildAuditLogEntryCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildAuditLogEntryCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.audit_log_entry_create.update_data(new_data).await; self.events.lock().await.guild.audit_log_entry_create.update_data(new_data).await;
} }
"GUILD_BAN_ADD" => { "GUILD_BAN_ADD" => {
let new_data: types::GuildBanAdd = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildBanAdd = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.ban_add.update_data(new_data).await; self.events.lock().await.guild.ban_add.update_data(new_data).await;
} }
"GUILD_BAN_REMOVE" => { "GUILD_BAN_REMOVE" => {
let new_data: types::GuildBanRemove = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildBanRemove = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.ban_remove.update_data(new_data).await; self.events.lock().await.guild.ban_remove.update_data(new_data).await;
} }
"GUILD_EMOJIS_UPDATE" => { "GUILD_EMOJIS_UPDATE" => {
let new_data: types::GuildEmojisUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildEmojisUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.emojis_update.update_data(new_data).await; self.events.lock().await.guild.emojis_update.update_data(new_data).await;
} }
"GUILD_STICKERS_UPDATE" => { "GUILD_STICKERS_UPDATE" => {
let new_data: types::GuildStickersUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildStickersUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.stickers_update.update_data(new_data).await; self.events.lock().await.guild.stickers_update.update_data(new_data).await;
} }
"GUILD_INTEGRATIONS_UPDATE" => { "GUILD_INTEGRATIONS_UPDATE" => {
let new_data: types::GuildIntegrationsUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildIntegrationsUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.integrations_update.update_data(new_data).await; self.events.lock().await.guild.integrations_update.update_data(new_data).await;
} }
"GUILD_MEMBER_ADD" => { "GUILD_MEMBER_ADD" => {
let new_data: types::GuildMemberAdd = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildMemberAdd = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.member_add.update_data(new_data).await; self.events.lock().await.guild.member_add.update_data(new_data).await;
} }
"GUILD_MEMBER_REMOVE" => { "GUILD_MEMBER_REMOVE" => {
let new_data: types::GuildMemberRemove = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildMemberRemove = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.member_remove.update_data(new_data).await; self.events.lock().await.guild.member_remove.update_data(new_data).await;
} }
"GUILD_MEMBER_UPDATE" => { "GUILD_MEMBER_UPDATE" => {
let new_data: types::GuildMemberUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildMemberUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.member_update.update_data(new_data).await; self.events.lock().await.guild.member_update.update_data(new_data).await;
} }
"GUILD_MEMBERS_CHUNK" => { "GUILD_MEMBERS_CHUNK" => {
let new_data: types::GuildMembersChunk = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildMembersChunk = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.members_chunk.update_data(new_data).await; self.events.lock().await.guild.members_chunk.update_data(new_data).await;
} }
"GUILD_ROLE_CREATE" => { "GUILD_ROLE_CREATE" => {
let new_data: types::GuildRoleCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildRoleCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_create.update_data(new_data).await; self.events.lock().await.guild.role_create.update_data(new_data).await;
} }
"GUILD_ROLE_UPDATE" => { "GUILD_ROLE_UPDATE" => {
let new_data: types::GuildRoleUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildRoleUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_update.update_data(new_data).await; self.events.lock().await.guild.role_update.update_data(new_data).await;
} }
"GUILD_ROLE_DELETE" => { "GUILD_ROLE_DELETE" => {
let new_data: types::GuildRoleDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildRoleDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_delete.update_data(new_data).await; self.events.lock().await.guild.role_delete.update_data(new_data).await;
} }
"GUILD_SCHEDULED_EVENT_CREATE" => { "GUILD_SCHEDULED_EVENT_CREATE" => {
let new_data: types::GuildScheduledEventCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildScheduledEventCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_scheduled_event_create.update_data(new_data).await; self.events.lock().await.guild.role_scheduled_event_create.update_data(new_data).await;
} }
"GUILD_SCHEDULED_EVENT_UPDATE" => { "GUILD_SCHEDULED_EVENT_UPDATE" => {
let new_data: types::GuildScheduledEventUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildScheduledEventUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_scheduled_event_update.update_data(new_data).await; self.events.lock().await.guild.role_scheduled_event_update.update_data(new_data).await;
} }
"GUILD_SCHEDULED_EVENT_DELETE" => { "GUILD_SCHEDULED_EVENT_DELETE" => {
let new_data: types::GuildScheduledEventDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildScheduledEventDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_scheduled_event_delete.update_data(new_data).await; self.events.lock().await.guild.role_scheduled_event_delete.update_data(new_data).await;
} }
"GUILD_SCHEDULED_EVENT_USER_ADD" => { "GUILD_SCHEDULED_EVENT_USER_ADD" => {
let new_data: types::GuildScheduledEventUserAdd = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildScheduledEventUserAdd = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_scheduled_event_user_add.update_data(new_data).await; self.events.lock().await.guild.role_scheduled_event_user_add.update_data(new_data).await;
} }
"GUILD_SCHEDULED_EVENT_USER_REMOVE" => { "GUILD_SCHEDULED_EVENT_USER_REMOVE" => {
let new_data: types::GuildScheduledEventUserRemove = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::GuildScheduledEventUserRemove = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.role_scheduled_event_user_remove.update_data(new_data).await; self.events.lock().await.guild.role_scheduled_event_user_remove.update_data(new_data).await;
} }
"PASSIVE_UPDATE_V1" => { "PASSIVE_UPDATE_V1" => {
let new_data: types::PassiveUpdateV1 = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::PassiveUpdateV1 = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.guild.passive_update_v1.update_data(new_data).await; self.events.lock().await.guild.passive_update_v1.update_data(new_data).await;
} }
"INTEGRATION_CREATE" => { "INTEGRATION_CREATE" => {
let new_data: types::IntegrationCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::IntegrationCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.integration.create.update_data(new_data).await; self.events.lock().await.integration.create.update_data(new_data).await;
} }
"INTEGRATION_UPDATE" => { "INTEGRATION_UPDATE" => {
let new_data: types::IntegrationUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::IntegrationUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.integration.update.update_data(new_data).await; self.events.lock().await.integration.update.update_data(new_data).await;
} }
"INTEGRATION_DELETE" => { "INTEGRATION_DELETE" => {
let new_data: types::IntegrationDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::IntegrationDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.integration.delete.update_data(new_data).await; self.events.lock().await.integration.delete.update_data(new_data).await;
} }
"INTERACTION_CREATE" => { "INTERACTION_CREATE" => {
let new_data: types::InteractionCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::InteractionCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.interaction.create.update_data(new_data).await; self.events.lock().await.interaction.create.update_data(new_data).await;
} }
"INVITE_CREATE" => { "INVITE_CREATE" => {
let new_data: types::InviteCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::InviteCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.invite.create.update_data(new_data).await; self.events.lock().await.invite.create.update_data(new_data).await;
} }
"INVITE_DELETE" => { "INVITE_DELETE" => {
let new_data: types::InviteDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::InviteDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.invite.delete.update_data(new_data).await; self.events.lock().await.invite.delete.update_data(new_data).await;
} }
"MESSAGE_CREATE" => { "MESSAGE_CREATE" => {
let new_data: types::MessageCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.create.update_data(new_data).await; self.events.lock().await.message.create.update_data(new_data).await;
} }
"MESSAGE_UPDATE" => { "MESSAGE_UPDATE" => {
let new_data: types::MessageUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.update.update_data(new_data).await; self.events.lock().await.message.update.update_data(new_data).await;
} }
"MESSAGE_DELETE" => { "MESSAGE_DELETE" => {
let new_data: types::MessageDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.delete.update_data(new_data).await; self.events.lock().await.message.delete.update_data(new_data).await;
} }
"MESSAGE_DELETE_BULK" => { "MESSAGE_DELETE_BULK" => {
let new_data: types::MessageDeleteBulk = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageDeleteBulk = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.delete_bulk.update_data(new_data).await; self.events.lock().await.message.delete_bulk.update_data(new_data).await;
} }
"MESSAGE_REACTION_ADD" => { "MESSAGE_REACTION_ADD" => {
let new_data: types::MessageReactionAdd = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageReactionAdd = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.reaction_add.update_data(new_data).await; self.events.lock().await.message.reaction_add.update_data(new_data).await;
} }
"MESSAGE_REACTION_REMOVE" => { "MESSAGE_REACTION_REMOVE" => {
let new_data: types::MessageReactionRemove = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageReactionRemove = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.reaction_remove.update_data(new_data).await; self.events.lock().await.message.reaction_remove.update_data(new_data).await;
} }
"MESSAGE_REACTION_REMOVE_ALL" => { "MESSAGE_REACTION_REMOVE_ALL" => {
let new_data: types::MessageReactionRemoveAll = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageReactionRemoveAll = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.reaction_remove_all.update_data(new_data).await; self.events.lock().await.message.reaction_remove_all.update_data(new_data).await;
} }
"MESSAGE_REACTION_REMOVE_EMOJI" => { "MESSAGE_REACTION_REMOVE_EMOJI" => {
let new_data: types::MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageReactionRemoveEmoji= serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.reaction_remove_emoji.update_data(new_data).await; self.events.lock().await.message.reaction_remove_emoji.update_data(new_data).await;
}, },
"MESSAGE_ACK" => { "MESSAGE_ACK" => {
let new_data: types::MessageACK = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::MessageACK = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.message.ack.update_data(new_data).await; self.events.lock().await.message.ack.update_data(new_data).await;
} }
"PRESENCE_UPDATE" => { "PRESENCE_UPDATE" => {
let new_data: types::PresenceUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::PresenceUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.user.presence_update.update_data(new_data).await; self.events.lock().await.user.presence_update.update_data(new_data).await;
} }
"RELATIONSHIP_ADD" => { "RELATIONSHIP_ADD" => {
let new_data: types::RelationshipAdd = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::RelationshipAdd = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.relationship.add.update_data(new_data).await; self.events.lock().await.relationship.add.update_data(new_data).await;
} }
"RELATIONSHIP_REMOVE" => { "RELATIONSHIP_REMOVE" => {
let new_data: types::RelationshipRemove = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::RelationshipRemove = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.relationship.remove.update_data(new_data).await; self.events.lock().await.relationship.remove.update_data(new_data).await;
} }
"STAGE_INSTANCE_CREATE" => { "STAGE_INSTANCE_CREATE" => {
let new_data: types::StageInstanceCreate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::StageInstanceCreate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.stage_instance.create.update_data(new_data).await; self.events.lock().await.stage_instance.create.update_data(new_data).await;
} }
"STAGE_INSTANCE_UPDATE" => { "STAGE_INSTANCE_UPDATE" => {
let new_data: types::StageInstanceUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::StageInstanceUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.stage_instance.update.update_data(new_data).await; self.events.lock().await.stage_instance.update.update_data(new_data).await;
} }
"STAGE_INSTANCE_DELETE" => { "STAGE_INSTANCE_DELETE" => {
let new_data: types::StageInstanceDelete = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::StageInstanceDelete = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.stage_instance.delete.update_data(new_data).await; self.events.lock().await.stage_instance.delete.update_data(new_data).await;
} }
"SESSIONS_REPLACE" => { "SESSIONS_REPLACE" => {
let sessions: Vec<types::Session> = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let sessions: Vec<types::Session> = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
let new_data = types::SessionsReplace {sessions}; let new_data = types::SessionsReplace {sessions};
self.events.lock().await.session.replace.update_data(new_data).await; self.events.lock().await.session.replace.update_data(new_data).await;
} }
"TYPING_START" => { "TYPING_START" => {
let new_data: types::TypingStartEvent = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::TypingStartEvent = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.user.typing_start_event.update_data(new_data).await; self.events.lock().await.user.typing_start_event.update_data(new_data).await;
} }
"USER_UPDATE" => { "USER_UPDATE" => {
let new_data: types::UserUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::UserUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.user.update.update_data(new_data).await; self.events.lock().await.user.update.update_data(new_data).await;
}, },
"USER_GUILD_SETTINGS_UPDATE" => { "USER_GUILD_SETTINGS_UPDATE" => {
let new_data: types::UserGuildSettingsUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::UserGuildSettingsUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.user.guild_settings_update.update_data(new_data).await; self.events.lock().await.user.guild_settings_update.update_data(new_data).await;
} }
"VOICE_STATE_UPDATE" => { "VOICE_STATE_UPDATE" => {
let new_data: types::VoiceStateUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::VoiceStateUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.voice.state_update.update_data(new_data).await; self.events.lock().await.voice.state_update.update_data(new_data).await;
} }
"VOICE_SERVER_UPDATE" => { "VOICE_SERVER_UPDATE" => {
let new_data: types::VoiceServerUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::VoiceServerUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.voice.server_update.update_data(new_data).await; self.events.lock().await.voice.server_update.update_data(new_data).await;
} }
"WEBHOOKS_UPDATE" => { "WEBHOOKS_UPDATE" => {
let new_data: types::WebhooksUpdate = serde_json::from_str(gateway_payload.d.unwrap().get()).unwrap(); let new_data: types::WebhooksUpdate = serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
self.events.lock().await.webhooks.update.update_data(new_data).await; self.events.lock().await.webhooks.update.update_data(new_data).await;
} }
_ => { _ => {
println!("Received unrecognised gateway event ({})! Please open an issue on the chorus github so we can implement it", &gateway_payload_t); println!("Received unrecognized gateway event ({})! Please open an issue on the chorus github so we can implement it", &gateway_payload_t);
} }
} }
} }
// Heartbeat
// We received a heartbeat from the server // We received a heartbeat from the server
1 => {} GATEWAY_HEARTBEAT => {}
// Reconnect GATEWAY_RECONNECT => {
7 => {
todo!() todo!()
} }
// Invalid Session GATEWAY_INVALID_SESSION => {
9 => {
todo!() todo!()
} }
// Hello
// Starts our heartbeat // Starts our heartbeat
// We should have already handled this in gateway init // We should have already handled this in gateway init
10 => { GATEWAY_HELLO => {
panic!("Recieved hello when it was unexpected"); panic!("Received hello when it was unexpected");
} }
// Heartbeat ACK GATEWAY_HEARTBEAT_ACK => {
11 => {
println!("GW: Received Heartbeat ACK"); println!("GW: Received Heartbeat ACK");
} }
2 | 3 | 4 | 6 | 8 => {panic!("Received gateway op code that's meant to be sent, not received ({})", gateway_payload.op)} GATEWAY_IDENTIFY | GATEWAY_UPDATE_PRESENCE | GATEWAY_UPDATE_VOICE_STATE | GATEWAY_RESUME | GATEWAY_REQUEST_GUILD_MEMBERS | GATEWAY_CALL_SYNC | GATEWAY_LAZY_REQUEST => {panic!("Received gateway op code that's meant to be sent, not received ({})", gateway_payload.op_code)}
_ => {println!("Received unrecognised gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op);} _ => {println!("Received unrecognized gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code);}
} }
// If we have an active heartbeat thread and we received a seq number we should let it know // If we have an active heartbeat thread and we received a seq number we should let it know
if gateway_payload.s.is_some() { if gateway_payload.sequence_number.is_some() {
if self.heartbeat_handler.is_some() { if self.heartbeat_handler.is_some() {
let heartbeat_communication = HeartbeatThreadCommunication { let heartbeat_communication = HeartbeatThreadCommunication {
op: gateway_payload.op, op_code: gateway_payload.op_code,
d: gateway_payload.s.unwrap(), sequence_number: gateway_payload.sequence_number.unwrap(),
}; };
self.heartbeat_handler self.heartbeat_handler
.as_mut() .as_mut()
.unwrap() .unwrap()
.tx .send
.send(heartbeat_communication) .send(heartbeat_communication)
.await .await
.unwrap(); .unwrap();
@ -564,8 +609,11 @@ Handles sending heartbeats to the gateway in another thread
*/ */
struct HeartbeatHandler { struct HeartbeatHandler {
/// The heartbeat interval in milliseconds /// The heartbeat interval in milliseconds
heartbeat_interval: u128, pub heartbeat_interval: u128,
tx: Sender<HeartbeatThreadCommunication>, /// The send channel for the heartbeat thread
pub send: Sender<HeartbeatThreadCommunication>,
/// The handle of the thread
handle: JoinHandle<()>
} }
impl HeartbeatHandler { impl HeartbeatHandler {
@ -580,25 +628,28 @@ impl HeartbeatHandler {
>, >,
>, >,
) -> HeartbeatHandler { ) -> HeartbeatHandler {
let (tx, mut rx) = mpsc::channel(32); let (send, mut receive) = mpsc::channel(32);
task::spawn(async move { let handle: JoinHandle<()> = task::spawn(async move {
let mut last_heartbeat: Instant = time::Instant::now();
let mut last_heartbeat_timestamp: Instant = time::Instant::now();
let mut last_seq_number: Option<u64> = None; let mut last_seq_number: Option<u64> = None;
loop { loop {
// If we received a seq number update, use that as the last seq number // If we received a seq number update, use that as the last seq number
let hb_communication: Result<HeartbeatThreadCommunication, TryRecvError> = let received_communication: Result<HeartbeatThreadCommunication, TryRecvError> =
rx.try_recv(); receive.try_recv();
if hb_communication.is_ok() { if received_communication.is_ok() {
last_seq_number = Some(hb_communication.unwrap().d); last_seq_number = Some(received_communication.unwrap().sequence_number);
} }
if last_heartbeat.elapsed().as_millis() > heartbeat_interval { let should_send = last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval;
if should_send {
println!("GW: Sending Heartbeat.."); println!("GW: Sending Heartbeat..");
let heartbeat = types::GatewayHeartbeat { let heartbeat = types::GatewayHeartbeat {
op: 1, op: GATEWAY_HEARTBEAT,
d: last_seq_number, d: last_seq_number,
}; };
@ -608,14 +659,15 @@ impl HeartbeatHandler {
websocket_tx.lock().await.send(msg).await.unwrap(); websocket_tx.lock().await.send(msg).await.unwrap();
last_heartbeat = time::Instant::now(); last_heartbeat_timestamp = time::Instant::now();
} }
} }
}); });
Self { Self {
heartbeat_interval, heartbeat_interval,
tx, send,
handle,
} }
} }
} }
@ -626,14 +678,14 @@ Either signifies a sequence number update or a received heartbeat ack
*/ */
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
struct HeartbeatThreadCommunication { struct HeartbeatThreadCommunication {
/// An opcode for the communication we received /// The opcode for the communication we received
op: u8, op_code: u8,
/// The sequence number we got from discord /// The sequence number we got from discord
d: u64, sequence_number: u64,
} }
/** /**
Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
an Observable. The Observer is notified when the Observable's data changes. an Observable. The Observer is notified when the Observable's data changes.
In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
*/ */
@ -758,7 +810,7 @@ mod events {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct Session { pub struct Session {
pub ready: GatewayEvent<types::GatewayReady>, pub ready: GatewayEvent<types::GatewayReady>,
pub ready_supplimental: GatewayEvent<types::GatewayReadySupplemental>, pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
pub replace: GatewayEvent<types::SessionsReplace> pub replace: GatewayEvent<types::SessionsReplace>
} }
@ -891,7 +943,7 @@ mod example {
} }
#[tokio::test] #[tokio::test]
async fn test_observer_behaviour() { async fn test_observer_behavior() {
let mut event = GatewayEvent::new(types::GatewayResume { let mut event = GatewayEvent::new(types::GatewayResume {
token: "start".to_string(), token: "start".to_string(),
session_id: "start".to_string(), session_id: "start".to_string(),

View File

@ -60,11 +60,16 @@ pub trait WebSocketEvent {}
/// Similar to [GatewayReceivePayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue] /// Similar to [GatewayReceivePayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue]
/// Also, we never need to send the event name /// Also, we never need to send the event name
pub struct GatewaySendPayload { pub struct GatewaySendPayload {
pub op: u8, #[serde(rename = "op")]
pub op_code: u8,
#[serde(rename = "d")]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub d: Option<serde_json::Value>, pub event_data: Option<serde_json::Value>,
#[serde(rename = "s")]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub s: Option<u64>, pub sequence_number: Option<u64>,
} }
impl WebSocketEvent for GatewaySendPayload {} impl WebSocketEvent for GatewaySendPayload {}
@ -76,11 +81,18 @@ impl WebSocketEvent for GatewaySendPayload {}
/// Also, we never need to sent the event name /// Also, we never need to sent the event name
pub struct GatewayReceivePayload<'a> { pub struct GatewayReceivePayload<'a> {
pub op: u8, #[serde(rename = "op")]
pub op_code: u8,
#[serde(borrow)] #[serde(borrow)]
pub d: Option<&'a serde_json::value::RawValue>, #[serde(rename = "d")]
pub s: Option<u64>, pub event_data: Option<&'a serde_json::value::RawValue>,
pub t: Option<String>,
#[serde(rename = "s")]
pub sequence_number: Option<u64>,
#[serde(rename = "t")]
pub event_name: Option<String>,
} }
impl<'a> WebSocketEvent for GatewayReceivePayload<'a> {} impl<'a> WebSocketEvent for GatewayReceivePayload<'a> {}