diff --git a/src/errors.rs b/src/errors.rs index 057f57f..37c165c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -40,7 +40,7 @@ custom_error! { /// Supposed to be sent as numbers, though they are sent as string most of the time? /// /// Also includes errors when initiating a connection and unexpected opcodes - #[derive(PartialEq, Eq)] + #[derive(Clone, PartialEq, Eq)] pub GatewayError // Errors we have received from the gateway UnknownError = "We're not sure what went wrong. Try reconnecting?", @@ -65,3 +65,31 @@ custom_error! { // Other misc errors UnexpectedOpcodeReceivedError{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}", } + +custom_error! { + // Like GatewayError for webrtc errors + // See https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice; + // Also supposed to be sent by numbers, but discord is asdfghgfjkkjldf when it comes to their errors + #[derive(Clone, PartialEq, Eq)] + pub VoiceGatewayError + // Errors we receive + UnknownOpcodeError = "You sent an invalid opcode", + FailedToDecodePayloadError = "You sent an invalid payload in your identifying to the (Webrtc) Gateway", + NotAuthenticatedError = "You sent a payload before identifying with the (Webrtc) Gateway", + AuthenticationFailedError = "The token you sent in your identify payload is incorrect", + AlreadyAuthenticatedError = "You sent more than one identify payload", + SessionNoLongerValidError = "Your session is no longer valid", + SessionTimeoutError = "Your session has timed out", + ServerNotFoundError = "We can't find the server you're trying to connect to", + UnknownProtocolError = "We didn't recognize the protocol you sent", + DisconnectedError = "Channel was deleted, you were kicked, voice server changed, or the main gateway session was dropped. Should not reconnect.", + VoiceServerCrashedError = "The server crashed, try resuming", + UnknownEncryptionModeError = "Server failed to decrypt data", + + // Errors when initiating a gateway connection + CannotConnectError{error: String} = "Cannot connect due to a tungstenite error: {error}", + NonHelloOnInitiateError{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong", + + // Other misc errors + UnexpectedOpcodeReceivedError{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}" +} diff --git a/src/gateway.rs b/src/gateway.rs index 2f90217..badf845 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -69,7 +69,7 @@ const GATEWAY_CALL_SYNC: u8 = 13; const GATEWAY_LAZY_REQUEST: u8 = 14; /// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms -const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; +pub const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; /// Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError]. /// This struct is used internally when handling messages. @@ -1704,7 +1704,7 @@ impl GatewayEvent { } /// Notifies the observers of the GatewayEvent. - async fn notify(&self, new_event_data: T) { + pub async fn notify(&self, new_event_data: T) { for observer in &self.observers { observer.update(&new_event_data); } diff --git a/src/types/events/webrtc/mod.rs b/src/types/events/webrtc/mod.rs index 69d8e32..53735e0 100644 --- a/src/types/events/webrtc/mod.rs +++ b/src/types/events/webrtc/mod.rs @@ -1,17 +1,57 @@ +use super::WebSocketEvent; +use serde::{Deserialize, Serialize}; + pub use identify::*; pub use ready::*; pub use select_protocol::*; -use serde::{Deserialize, Serialize}; +pub use session_description::*; +pub use speaking::*; mod identify; mod ready; mod select_protocol; +mod session_description; +mod speaking; + +#[derive(Debug, Default, Serialize, Clone)] +/// The payload used for sending events to the webrtc gateway +/// Not tha this is very similar to the regular gateway, except we no longer have a sequence number +/// +/// Similar to [WebrtcReceivePayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue] +/// Also, we never need to send the event name +pub struct VoiceGatewaySendPayload { + #[serde(rename = "op")] + pub op_code: u8, + + #[serde(rename = "d")] + pub data: serde_json::Value, +} + +impl WebSocketEvent for VoiceGatewaySendPayload {} + +#[derive(Debug, Deserialize, Clone)] +/// The payload used for receiving events from the webrtc gateway +/// Note that this is very similar to the regular gateway, except we no longer have s or t +/// +/// Similar to [WebrtcSendPayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue] +/// Also, we never need to sent the event name +pub struct VoiceGatewayReceivePayload<'a> { + #[serde(rename = "op")] + pub op_code: u8, + + #[serde(borrow)] + #[serde(rename = "d")] + pub data: &'a serde_json::value::RawValue, +} + +impl<'a> WebSocketEvent for VoiceGatewayReceivePayload<'a> {} /// The modes of encryption available in webrtc connections; /// See https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-encryption-modes; -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum WebrtcEncryptionMode { + #[default] XSalsa20Poly1305, XSalsa20Poly1305Suffix, XSalsa20Poly1305Lite, @@ -28,4 +68,9 @@ pub const VOICE_HEARTBEAT_ACK: u8 = 6; pub const VOICE_RESUME: u8 = 7; pub const VOICE_HELLO: u8 = 8; pub const VOICE_RESUMED: u8 = 9; +/// See https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes +pub const VOICE_VIDEO: u8 = 12; pub const VOICE_CLIENT_DISCONENCT: u8 = 13; +/// See https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes; +/// Sent with empty data from the client, the server responds with the voice backend version; +pub const VOICE_BACKEND_VERSION: u8 = 16; diff --git a/src/types/events/webrtc/session_description.rs b/src/types/events/webrtc/session_description.rs new file mode 100644 index 0000000..cda07fd --- /dev/null +++ b/src/types/events/webrtc/session_description.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; +use crate::types::WebSocketEvent; +use super::WebrtcEncryptionMode; + +#[derive(Debug, Deserialize, Serialize, Clone, Default)] +/// Event that describes our encryption mode and secret key for encryption +pub struct SessionDescription { + /// The encryption mode we're using in webrtc + pub mode: WebrtcEncryptionMode, + /// The secret key we'll use for encryption + pub secret_key: [u8; 32], +} + +impl WebSocketEvent for SessionDescription {} \ No newline at end of file diff --git a/src/types/events/webrtc/speaking.rs b/src/types/events/webrtc/speaking.rs new file mode 100644 index 0000000..3778266 --- /dev/null +++ b/src/types/events/webrtc/speaking.rs @@ -0,0 +1,35 @@ +use bitflags::bitflags; +use serde::{Deserialize, Serialize}; + +/// Event that tells the server we are speaking; +/// Essentially, what allows us to send udp data and lights up the green circle around your avatar; +/// See https://discord.com/developers/docs/topics/voice-connections#speaking-example-speaking-payload +#[derive(Debug, Deserialize, Serialize, Clone, Default)] +pub struct Speaking { + /// Data about the audio we're transmitting, its type + speaking: SpeakingBitflags, + /// Assuming delay in milliseconds for the audio, should be 0 most of the time + delay: u64, + ssrc: i32, +} + +bitflags! { + /// Bitflags of speaking types; + /// See https://discord.com/developers/docs/topics/voice-connections#speaking; + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize, Deserialize)] + pub struct SpeakingBitflags: u8 { + /// Whether we'll be transmitting normal voice audio + const MICROPHONE = 1 << 0; + /// Whether we'll be transmitting context audio for video, no speaking indicator + const SOUNDSHARE = 1 << 1; + /// Whether we are a priority speaker, lowering audio of other speakers + const PRIORITY = 1 << 2; + } +} + +impl Default for SpeakingBitflags { + /// Returns the default value for these flags, assuming normal microphone audio and not being a priority speaker + fn default() -> Self { + Self::MICROPHONE + } +} diff --git a/src/voice.rs b/src/voice.rs index 8b13789..09fa3c4 100644 --- a/src/voice.rs +++ b/src/voice.rs @@ -1 +1,569 @@ +use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::SinkExt; +use futures_util::StreamExt; +use native_tls::TlsConnector; +use serde_json::json; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time; +use tokio::time::Instant; +use tokio_tungstenite::MaybeTlsStream; +use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream}; +use crate::errors::VoiceGatewayError; +use crate::gateway::{GatewayEvent, HEARTBEAT_ACK_TIMEOUT}; +use crate::types::{ + self, SelectProtocol, Speaking, VoiceGatewayReceivePayload, VoiceGatewaySendPayload, + VoiceIdentify, WebSocketEvent, VOICE_BACKEND_VERSION, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK, + VOICE_HELLO, VOICE_IDENTIFY, VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, + VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING, +}; + +/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError]. +/// This struct is used internally when handling messages. +#[derive(Clone, Debug)] +pub struct VoiceGatewayMesssage { + /// The message we received from the server + message: tokio_tungstenite::tungstenite::Message, +} + +impl VoiceGatewayMesssage { + /// Creates self from a tungstenite message + pub fn from_tungstenite_message(message: tokio_tungstenite::tungstenite::Message) -> Self { + Self { message } + } + + /// Parses the message as an error; + /// Returns the error if succesfully parsed, None if the message isn't an error + pub fn error(&self) -> Option { + let content = self.message.to_string(); + + // Some error strings have dots on the end, which we don't care about + let processed_content = content.to_lowercase().replace('.', ""); + + match processed_content.as_str() { + "unknown opcode" | "4001" => Some(VoiceGatewayError::UnknownOpcodeError), + "decode error" | "failed to decode payload" | "4002" => { + Some(VoiceGatewayError::FailedToDecodePayloadError) + } + "not authenticated" | "4003" => Some(VoiceGatewayError::NotAuthenticatedError), + "authentication failed" | "4004" => Some(VoiceGatewayError::AuthenticationFailedError), + "already authenticated" | "4005" => Some(VoiceGatewayError::AlreadyAuthenticatedError), + "session no longer valid" | "4006" => { + Some(VoiceGatewayError::SessionNoLongerValidError) + } + "session timeout" | "4009" => Some(VoiceGatewayError::SessionTimeoutError), + "server not found" | "4011" => Some(VoiceGatewayError::ServerNotFoundError), + "unknown protocol" | "4012" => Some(VoiceGatewayError::UnknownProtocolError), + "disconnected" | "4014" => Some(VoiceGatewayError::DisconnectedError), + "voice server crashed" | "4015" => Some(VoiceGatewayError::VoiceServerCrashedError), + "unknown encryption mode" | "4016" => { + Some(VoiceGatewayError::UnknownEncryptionModeError) + } + _ => None, + } + } + + /// Returns whether or not the message is an error + pub fn is_error(&self) -> bool { + self.error().is_some() + } + + /// Parses the message as a payload; + /// Returns a result of deserializing + pub fn payload(&self) -> Result { + return serde_json::from_str(self.message.to_text().unwrap()); + } + + /// Returns whether or not the message is a payload + pub fn is_payload(&self) -> bool { + // close messages are never payloads, payloads are only text messages + if self.message.is_close() | !self.message.is_text() { + return false; + } + + return self.payload().is_ok(); + } + + /// Returns whether or not the message is empty + pub fn is_empty(&self) -> bool { + self.message.is_empty() + } +} + +/// Represents a handle to a Voice Gateway connection. +/// Using this handle you can send Gateway Events directly. +#[derive(Debug)] +pub struct VoiceGatewayHandle { + pub url: String, + pub events: Arc>, + pub websocket_send: Arc< + Mutex< + SplitSink< + WebSocketStream>, + tokio_tungstenite::tungstenite::Message, + >, + >, + >, + pub handle: JoinHandle<()>, + /// Tells gateway tasks to close + kill_send: tokio::sync::broadcast::Sender<()>, +} + +impl VoiceGatewayHandle { + /// Sends json to the gateway with an opcode + async fn send_json(&self, op_code: u8, to_send: serde_json::Value) { + let gateway_payload = VoiceGatewaySendPayload { + op_code, + data: to_send, + }; + + let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + + let message = tokio_tungstenite::tungstenite::Message::text(payload_json); + + self.websocket_send + .lock() + .await + .send(message) + .await + .unwrap(); + } + + /// Sends a voice identify event to the gateway + pub async fn send_identify(&self, to_send: VoiceIdentify) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + println!("VGW: Sending Identify.."); + + self.send_json(VOICE_IDENTIFY, to_send_value).await; + } + + /// Sends a select protocol event to the gateway + pub async fn send_select_protocol(&self, to_send: SelectProtocol) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + println!("VGW: Sending Select Protocol"); + + self.send_json(VOICE_SELECT_PROTOCOL, to_send_value).await; + } + + /// Sends a speaking event to the gateway + pub async fn send_speaking(&self, to_send: Speaking) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + println!("VGW: Sending Speaking"); + + self.send_json(VOICE_SPEAKING, to_send_value).await; + } + + /// Sends a voice backend version request to the gateway + pub async fn send_voice_backend_version_request(&self) { + let data_empty_object = json!("{}"); + + println!("VGW: Requesting voice backend version"); + + self.send_json(VOICE_BACKEND_VERSION, data_empty_object) + .await; + } + + /// Closes the websocket connection and stops all gateway tasks; + /// + /// Esentially pulls the plug on the voice gateway, leaving it possible to resume; + pub async fn close(&self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } +} +pub struct VoiceGateway { + pub events: Arc>, + heartbeat_handler: VoiceHeartbeatHandler, + pub websocket_send: Arc< + Mutex< + SplitSink< + WebSocketStream>, + tokio_tungstenite::tungstenite::Message, + >, + >, + >, + pub websocket_receive: SplitStream>>, + kill_send: tokio::sync::broadcast::Sender<()>, +} + +impl VoiceGateway { + #[allow(clippy::new_ret_no_self)] + pub async fn new(websocket_url: String) -> Result { + // Append the needed things to the websocket url + let processed_url = format!("wss://{}?v=4", websocket_url); + + let (websocket_stream, _) = match connect_async_tls_with_config( + &processed_url, + None, + false, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(websocket_stream) => websocket_stream, + Err(e) => { + return Err(VoiceGatewayError::CannotConnectError { + error: e.to_string(), + }) + } + }; + + let (websocket_send, mut websocket_receive) = websocket_stream.split(); + + let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); + + // Create a shared broadcast channel for killing all gateway tasks + let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16); + + // Wait for the first hello and then spawn both tasks so we avoid nested tasks + // This automatically spawns the heartbeat task, but from the main thread + let msg = websocket_receive.next().await.unwrap().unwrap(); + let gateway_payload: VoiceGatewayReceivePayload = + serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + if gateway_payload.op_code != VOICE_HELLO { + return Err(VoiceGatewayError::NonHelloOnInitiateError { + opcode: gateway_payload.op_code, + }); + } + + println!("VGW: Received Hello"); + + // The hello data is the same on voice and normal gateway + let gateway_hello: types::HelloData = + serde_json::from_str(gateway_payload.data.get()).unwrap(); + + let voice_events = voice_events::VoiceEvents::default(); + let shared_events = Arc::new(Mutex::new(voice_events)); + + let mut gateway = VoiceGateway { + events: shared_events.clone(), + heartbeat_handler: VoiceHeartbeatHandler::new( + gateway_hello.heartbeat_interval, + 1, // to:do actually compute nonce + shared_websocket_send.clone(), + kill_send.subscribe(), + ), + websocket_send: shared_websocket_send.clone(), + websocket_receive, + kill_send: kill_send.clone(), + }; + + // Now we can continuously check for messages in a different task, since we aren't going to receive another hello + let handle: JoinHandle<()> = tokio::spawn(async move { + gateway.gateway_listen_task().await; + }); + + Ok(VoiceGatewayHandle { + url: websocket_url.clone(), + events: shared_events, + websocket_send: shared_websocket_send.clone(), + handle, + kill_send: kill_send.clone(), + }) + } + + /// The main gateway listener task; + /// + /// Can only be stopped by closing the websocket, cannot be made to listen for kill + pub async fn gateway_listen_task(&mut self) { + loop { + let msg = self.websocket_receive.next().await; + + if let Some(Ok(message)) = msg { + self.handle_message(VoiceGatewayMesssage::from_tungstenite_message(message)) + .await; + continue; + } + + // We couldn't receive the next message or it was an error, something is wrong with the websocket, close + println!("VGW: Websocket is broken, stopping gateway"); + break; + } + } + + /// Closes the websocket connection and stops all tasks + async fn close(&mut self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } + + /// Deserializes and updates a dispatched event, when we already know its type; + /// (Called for every event in handle_message) + async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( + data: &'a str, + event: &mut GatewayEvent, + ) -> Result<(), serde_json::Error> { + let data_deserialize_result: Result = serde_json::from_str(data); + + if data_deserialize_result.is_err() { + return Err(data_deserialize_result.err().unwrap()); + } + + event.notify(data_deserialize_result.unwrap()).await; + Ok(()) + } + + /// This handles a message as a websocket event and updates its events along with the events' observers + pub async fn handle_message(&mut self, msg: VoiceGatewayMesssage) { + if msg.is_empty() { + return; + } + + if !msg.is_error() && !msg.is_payload() { + println!( + "Message unrecognised: {:?}, please open an issue on the chorus github", + msg.message.to_string() + ); + return; + } + + // To:do: handle errors in a good way, maybe observers like events? + if msg.is_error() { + println!("VGW: Received error, connection will close.."); + + let _error = msg.error(); + + {} + + self.close().await; + return; + } + + let gateway_payload = msg.payload().unwrap(); + + match gateway_payload.op_code { + VOICE_READY => { + let event = &mut self.events.lock().await.voice_ready; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + println!("Failed to parse VOICE_READY ({})", result.err().unwrap()); + return; + } + } + VOICE_SESSION_DESCRIPTION => { + let event = &mut self.events.lock().await.session_description; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + println!( + "Failed to parse VOICE_SELECT_PROTOCOL ({})", + result.err().unwrap() + ); + return; + } + } + // We received a heartbeat from the server + // "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately." + VOICE_HEARTBEAT => { + println!("VGW: Received Heartbeat // Heartbeat Request"); + + // Tell the heartbeat handler it should send a heartbeat right away + let heartbeat_communication = VoiceHeartbeatThreadCommunication { + updated_nonce: None, + op_code: Some(VOICE_HEARTBEAT), + }; + + self.heartbeat_handler + .send + .send(heartbeat_communication) + .await + .unwrap(); + } + VOICE_HEARTBEAT_ACK => { + println!("VGW: Received Heartbeat ACK"); + + // Tell the heartbeat handler we received an ack + + let heartbeat_communication = VoiceHeartbeatThreadCommunication { + updated_nonce: None, + op_code: Some(VOICE_HEARTBEAT_ACK), + }; + + self.heartbeat_handler + .send + .send(heartbeat_communication) + .await + .unwrap(); + } + VOICE_IDENTIFY | VOICE_SELECT_PROTOCOL | VOICE_RESUME => { + let error = VoiceGatewayError::UnexpectedOpcodeReceivedError { + opcode: gateway_payload.op_code, + }; + Err::<(), VoiceGatewayError>(error).unwrap(); + } + _ => { + println!("Received unrecognized voice gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code); + } + } + } +} + +/// Handles sending heartbeats to the voice gateway in another thread +struct VoiceHeartbeatHandler { + /// The heartbeat interval in milliseconds + pub heartbeat_interval: u128, + /// The send channel for the heartbeat thread + pub send: Sender, + /// The handle of the thread + handle: JoinHandle<()>, +} + +impl VoiceHeartbeatHandler { + pub fn new( + heartbeat_interval: u128, + starting_nonce: u64, + websocket_tx: Arc< + Mutex< + SplitSink< + WebSocketStream>, + tokio_tungstenite::tungstenite::Message, + >, + >, + >, + kill_rc: tokio::sync::broadcast::Receiver<()>, + ) -> Self { + let (send, receive) = tokio::sync::mpsc::channel(32); + let kill_receive = kill_rc.resubscribe(); + + let handle: JoinHandle<()> = tokio::spawn(async move { + Self::heartbeat_task( + websocket_tx, + heartbeat_interval, + starting_nonce, + receive, + kill_receive, + ) + .await; + }); + + Self { + heartbeat_interval, + send, + handle, + } + } + + /// The main heartbeat task; + /// + /// Can be killed by the kill broadcast; + /// If the websocket is closed, will die out next time it tries to send a heartbeat; + pub async fn heartbeat_task( + websocket_tx: Arc< + Mutex< + SplitSink< + WebSocketStream>, + tokio_tungstenite::tungstenite::Message, + >, + >, + >, + heartbeat_interval: u128, + starting_nonce: u64, + mut receive: tokio::sync::mpsc::Receiver, + mut kill_receive: tokio::sync::broadcast::Receiver<()>, + ) { + let mut last_heartbeat_timestamp: Instant = time::Instant::now(); + let mut last_heartbeat_acknowledged = true; + let mut nonce: u64 = starting_nonce; + + loop { + let should_shutdown = kill_receive.try_recv().is_ok(); + if should_shutdown { + break; + } + + let mut should_send; + + let time_to_send = last_heartbeat_timestamp.elapsed().as_millis() >= heartbeat_interval; + + should_send = time_to_send; + + let received_communication: Result = + receive.try_recv(); + if received_communication.is_ok() { + let communication = received_communication.unwrap(); + + // If we received a nonce update, use that nonce now + if communication.updated_nonce.is_some() { + nonce = communication.updated_nonce.unwrap(); + } + + if communication.op_code.is_some() { + match communication.op_code.unwrap() { + VOICE_HEARTBEAT => { + // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately + should_send = true; + } + VOICE_HEARTBEAT_ACK => { + // The server received our heartbeat + last_heartbeat_acknowledged = true; + } + _ => {} + } + } + } + + // If the server hasn't acknowledged our heartbeat we should resend it + if !last_heartbeat_acknowledged + && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT + { + should_send = true; + println!("VGW: Timed out waiting for a heartbeat ack, resending"); + } + + if should_send { + println!("VGW: Sending Heartbeat.."); + + let heartbeat = VoiceGatewaySendPayload { + op_code: VOICE_HEARTBEAT, + data: nonce.into(), + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); + + let send_result = websocket_tx.lock().await.send(msg).await; + if send_result.is_err() { + // We couldn't send, the websocket is broken + println!("VGW: Couldnt send heartbeat, websocket seems broken"); + break; + } + + last_heartbeat_timestamp = time::Instant::now(); + last_heartbeat_acknowledged = false; + } + } + } +} + +/// Used for communications between the voice heartbeat and voice gateway thread. +/// Either signifies a nonce update, a heartbeat ACK or a Heartbeat request by the server +#[derive(Clone, Copy, Debug)] +struct VoiceHeartbeatThreadCommunication { + /// The opcode for the communication we received, if relevant + op_code: Option, + /// The new nonce to use, if any + updated_nonce: Option, +} + +mod voice_events { + use crate::types::{SessionDescription, VoiceReady}; + + use super::*; + + #[derive(Default, Debug)] + pub struct VoiceEvents { + pub voice_ready: GatewayEvent, + pub session_description: GatewayEvent, + } +}