From d767e3e76712e7474a3fef64b20b21a394179001 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Thu, 28 Dec 2023 09:21:47 +0100 Subject: [PATCH] feat: first try at vgw wasm compat --- src/voice/gateway.rs | 698 ---------------------- src/voice/gateway/backends/mod.rs | 23 + src/voice/gateway/backends/tungstenite.rs | 65 ++ src/voice/gateway/backends/wasm.rs | 48 ++ src/voice/gateway/events.rs | 24 + src/voice/gateway/gateway.rs | 335 +++++++++++ src/voice/gateway/handle.rs | 101 ++++ src/voice/gateway/heartbeat.rs | 160 +++++ src/voice/gateway/message.rs | 39 ++ src/voice/gateway/mod.rs | 11 + src/voice/udp.rs | 2 +- 11 files changed, 807 insertions(+), 699 deletions(-) delete mode 100644 src/voice/gateway.rs create mode 100644 src/voice/gateway/backends/mod.rs create mode 100644 src/voice/gateway/backends/tungstenite.rs create mode 100644 src/voice/gateway/backends/wasm.rs create mode 100644 src/voice/gateway/events.rs create mode 100644 src/voice/gateway/gateway.rs create mode 100644 src/voice/gateway/handle.rs create mode 100644 src/voice/gateway/heartbeat.rs create mode 100644 src/voice/gateway/message.rs create mode 100644 src/voice/gateway/mod.rs diff --git a/src/voice/gateway.rs b/src/voice/gateway.rs deleted file mode 100644 index 3616cd3..0000000 --- a/src/voice/gateway.rs +++ /dev/null @@ -1,698 +0,0 @@ -use futures_util::stream::{SplitSink, SplitStream}; -use futures_util::SinkExt; -use futures_util::StreamExt; -use log::{debug, info, trace, warn}; -use serde_json::json; -use std::sync::Arc; -use std::time::Duration; -use tokio::net::TcpStream; -use tokio::sync::mpsc::Sender; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio::time::Instant; -use tokio::time::{self, sleep_until}; -use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream}; - -use crate::errors::VoiceGatewayError; -use crate::gateway::{heartbeat::HEARTBEAT_ACK_TIMEOUT, GatewayEvent}; -use crate::types::{ - self, SelectProtocol, Speaking, SsrcDefinition, VoiceGatewayReceivePayload, - VoiceGatewaySendPayload, VoiceIdentify, WebSocketEvent, VOICE_BACKEND_VERSION, - VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT, - VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK, VOICE_HELLO, VOICE_IDENTIFY, VOICE_MEDIA_SINK_WANTS, - VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, VOICE_SESSION_DESCRIPTION, - VOICE_SESSION_UPDATE, VOICE_SPEAKING, VOICE_SSRC_DEFINITION, -}; - -use self::voice_events::VoiceEvents; - -/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError]. -/// This struct is used internally when handling messages. -#[derive(Clone, Debug)] -pub struct VoiceGatewayMesssage { - /// The message we received from the server - message: tokio_tungstenite::tungstenite::Message, -} - -impl VoiceGatewayMesssage { - /// Creates self from a tungstenite message - pub fn from_tungstenite_message(message: tokio_tungstenite::tungstenite::Message) -> Self { - Self { message } - } - - /// Parses the message as an error; - /// Returns the error if succesfully parsed, None if the message isn't an error - pub fn error(&self) -> Option { - let content = self.message.to_string(); - - // Some error strings have dots on the end, which we don't care about - let processed_content = content.to_lowercase().replace('.', ""); - - match processed_content.as_str() { - "unknown opcode" | "4001" => Some(VoiceGatewayError::UnknownOpcode), - "decode error" | "failed to decode payload" | "4002" => { - Some(VoiceGatewayError::FailedToDecodePayload) - } - "not authenticated" | "4003" => Some(VoiceGatewayError::NotAuthenticated), - "authentication failed" | "4004" => Some(VoiceGatewayError::AuthenticationFailed), - "already authenticated" | "4005" => Some(VoiceGatewayError::AlreadyAuthenticated), - "session is no longer valid" | "4006" => Some(VoiceGatewayError::SessionNoLongerValid), - "session timeout" | "4009" => Some(VoiceGatewayError::SessionTimeout), - "server not found" | "4011" => Some(VoiceGatewayError::ServerNotFound), - "unknown protocol" | "4012" => Some(VoiceGatewayError::UnknownProtocol), - "disconnected" | "4014" => Some(VoiceGatewayError::Disconnected), - "voice server crashed" | "4015" => Some(VoiceGatewayError::VoiceServerCrashed), - "unknown encryption mode" | "4016" => Some(VoiceGatewayError::UnknownEncryptionMode), - _ => None, - } - } - - /// Returns whether or not the message is an error - pub fn is_error(&self) -> bool { - self.error().is_some() - } - - /// Parses the message as a payload; - /// Returns a result of deserializing - pub fn payload(&self) -> Result { - return serde_json::from_str(self.message.to_text().unwrap()); - } - - /// Returns whether or not the message is a payload - pub fn is_payload(&self) -> bool { - // close messages are never payloads, payloads are only text messages - if self.message.is_close() | !self.message.is_text() { - return false; - } - - return self.payload().is_ok(); - } - - /// Returns whether or not the message is empty - pub fn is_empty(&self) -> bool { - self.message.is_empty() - } -} - -/// Represents a handle to a Voice Gateway connection. -/// Using this handle you can send Gateway Events directly. -#[derive(Debug, Clone)] -pub struct VoiceGatewayHandle { - pub url: String, - pub events: Arc>, - pub websocket_send: Arc< - Mutex< - SplitSink< - WebSocketStream>, - tokio_tungstenite::tungstenite::Message, - >, - >, - >, - /// Tells gateway tasks to close - kill_send: tokio::sync::broadcast::Sender<()>, -} - -impl VoiceGatewayHandle { - /// Sends json to the gateway with an opcode - async fn send_json(&self, op_code: u8, to_send: serde_json::Value) { - let gateway_payload = VoiceGatewaySendPayload { - op_code, - data: to_send, - }; - - let payload_json = serde_json::to_string(&gateway_payload).unwrap(); - - let message = tokio_tungstenite::tungstenite::Message::text(payload_json); - - self.websocket_send - .lock() - .await - .send(message) - .await - .unwrap(); - } - - /// Sends a voice identify event to the gateway - pub async fn send_identify(&self, to_send: VoiceIdentify) { - let to_send_value = serde_json::to_value(&to_send).unwrap(); - - trace!("VGW: Sending Identify.."); - - self.send_json(VOICE_IDENTIFY, to_send_value).await; - } - - /// Sends a select protocol event to the gateway - pub async fn send_select_protocol(&self, to_send: SelectProtocol) { - let to_send_value = serde_json::to_value(&to_send).unwrap(); - - trace!("VGW: Sending Select Protocol"); - - self.send_json(VOICE_SELECT_PROTOCOL, to_send_value).await; - } - - /// Sends a speaking event to the gateway - pub async fn send_speaking(&self, to_send: Speaking) { - let to_send_value = serde_json::to_value(&to_send).unwrap(); - - trace!("VGW: Sending Speaking"); - - self.send_json(VOICE_SPEAKING, to_send_value).await; - } - - /// Sends an ssrc definition event - pub async fn send_ssrc_definition(&self, to_send: SsrcDefinition) { - let to_send_value = serde_json::to_value(&to_send).unwrap(); - - trace!("VGW: Sending SsrcDefinition"); - - self.send_json(VOICE_SSRC_DEFINITION, to_send_value).await; - } - - /// Sends a voice backend version request to the gateway - pub async fn send_voice_backend_version_request(&self) { - let data_empty_object = json!("{}"); - - trace!("VGW: Requesting voice backend version"); - - self.send_json(VOICE_BACKEND_VERSION, data_empty_object) - .await; - } - - /// Closes the websocket connection and stops all gateway tasks; - /// - /// Esentially pulls the plug on the voice gateway, leaving it possible to resume; - pub async fn close(&self) { - self.kill_send.send(()).unwrap(); - self.websocket_send.lock().await.close().await.unwrap(); - } -} - -#[derive(Debug)] -pub struct VoiceGateway { - events: Arc>, - heartbeat_handler: VoiceHeartbeatHandler, - websocket_send: Arc< - Mutex< - SplitSink< - WebSocketStream>, - tokio_tungstenite::tungstenite::Message, - >, - >, - >, - websocket_receive: SplitStream>>, - kill_send: tokio::sync::broadcast::Sender<()>, -} - -impl VoiceGateway { - #[allow(clippy::new_ret_no_self)] - pub async fn spawn(websocket_url: String) -> Result { - // Append the needed things to the websocket url - let processed_url = format!("wss://{}/?v=7", websocket_url); - trace!("Created voice socket url: {}", processed_url.clone()); - - let mut roots = rustls::RootCertStore::empty(); - for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") - { - roots.add(&rustls::Certificate(cert.0)).unwrap(); - } - let (websocket_stream, _) = match connect_async_tls_with_config( - &processed_url, - None, - false, - Some(Connector::Rustls( - rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth() - .into(), - )), - ) - .await - { - Ok(websocket_stream) => websocket_stream, - Err(e) => { - return Err(VoiceGatewayError::CannotConnect { - error: e.to_string(), - }) - } - }; - - let (websocket_send, mut websocket_receive) = websocket_stream.split(); - - let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); - - // Create a shared broadcast channel for killing all gateway tasks - let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16); - - // Wait for the first hello and then spawn both tasks so we avoid nested tasks - // This automatically spawns the heartbeat task, but from the main thread - let msg = websocket_receive.next().await.unwrap().unwrap(); - let gateway_payload: VoiceGatewayReceivePayload = - serde_json::from_str(msg.to_text().unwrap()).unwrap(); - - if gateway_payload.op_code != VOICE_HELLO { - return Err(VoiceGatewayError::NonHelloOnInitiate { - opcode: gateway_payload.op_code, - }); - } - - info!("VGW: Received Hello"); - - // The hello data for voice gateways is in float milliseconds, so we convert it to f64 seconds - let gateway_hello: types::VoiceHelloData = - serde_json::from_str(gateway_payload.data.get()).unwrap(); - let heartbeat_interval_seconds: f64 = gateway_hello.heartbeat_interval / 1000.0; - - let voice_events = VoiceEvents::default(); - let shared_events = Arc::new(Mutex::new(voice_events)); - - let mut gateway = VoiceGateway { - events: shared_events.clone(), - heartbeat_handler: VoiceHeartbeatHandler::new( - Duration::from_secs_f64(heartbeat_interval_seconds), - 1, // to:do actually compute nonce - shared_websocket_send.clone(), - kill_send.subscribe(), - ), - websocket_send: shared_websocket_send.clone(), - websocket_receive, - kill_send: kill_send.clone(), - }; - - // Now we can continuously check for messages in a different task, since we aren't going to receive another hello - tokio::spawn(async move { - gateway.gateway_listen_task().await; - }); - - Ok(VoiceGatewayHandle { - url: websocket_url.clone(), - events: shared_events, - websocket_send: shared_websocket_send.clone(), - kill_send: kill_send.clone(), - }) - } - - /// The main gateway listener task; - /// - /// Can only be stopped by closing the websocket, cannot be made to listen for kill - pub async fn gateway_listen_task(&mut self) { - loop { - let msg = self.websocket_receive.next().await; - - if let Some(Ok(message)) = msg { - self.handle_message(VoiceGatewayMesssage::from_tungstenite_message(message)) - .await; - continue; - } - - // We couldn't receive the next message or it was an error, something is wrong with the websocket, close - warn!("VGW: Websocket is broken, stopping gateway"); - break; - } - } - - /// Closes the websocket connection and stops all tasks - async fn close(&mut self) { - self.kill_send.send(()).unwrap(); - self.websocket_send.lock().await.close().await.unwrap(); - } - - /// Deserializes and updates a dispatched event, when we already know its type; - /// (Called for every event in handle_message) - async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( - data: &'a str, - event: &mut GatewayEvent, - ) -> Result<(), serde_json::Error> { - let data_deserialize_result: Result = serde_json::from_str(data); - - if data_deserialize_result.is_err() { - return Err(data_deserialize_result.err().unwrap()); - } - - event.notify(data_deserialize_result.unwrap()).await; - Ok(()) - } - - /// This handles a message as a websocket event and updates its events along with the events' observers - pub async fn handle_message(&mut self, msg: VoiceGatewayMesssage) { - if msg.is_empty() { - return; - } - - if !msg.is_error() && !msg.is_payload() { - warn!( - "Message unrecognised: {:?}, please open an issue on the chorus github", - msg.message.to_string() - ); - return; - } - - if msg.is_error() { - let error = msg.error().unwrap(); - - warn!("VGW: Received error, connection will close.."); - - self.close().await; - - self.events.lock().await.error.notify(error).await; - - return; - } - - let gateway_payload = msg.payload().unwrap(); - - // See - match gateway_payload.op_code { - VOICE_READY => { - trace!("VGW: Received READY!"); - - let event = &mut self.events.lock().await.voice_ready; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!("Failed to parse VOICE_READY ({})", result.err().unwrap()); - } - } - VOICE_BACKEND_VERSION => { - trace!("VGW: Received Backend Version"); - - let event = &mut self.events.lock().await.backend_version; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_BACKEND_VERSION ({})", - result.err().unwrap() - ); - } - } - VOICE_SESSION_DESCRIPTION => { - trace!("VGW: Received Session Description"); - - let event = &mut self.events.lock().await.session_description; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_SESSION_DESCRIPTION ({})", - result.err().unwrap() - ); - } - } - VOICE_SESSION_UPDATE => { - trace!("VGW: Received Session Update"); - - let event = &mut self.events.lock().await.session_update; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_SESSION_UPDATE ({})", - result.err().unwrap() - ); - } - } - VOICE_SPEAKING => { - trace!("VGW: Received Speaking"); - - let event = &mut self.events.lock().await.speaking; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!("Failed to parse VOICE_SPEAKING ({})", result.err().unwrap()); - } - } - VOICE_SSRC_DEFINITION => { - trace!("VGW: Received Ssrc Definition"); - - let event = &mut self.events.lock().await.ssrc_definition; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_SSRC_DEFINITION ({})", - result.err().unwrap() - ); - } - } - VOICE_CLIENT_DISCONNECT => { - trace!("VGW: Received Client Disconnect"); - - let event = &mut self.events.lock().await.client_disconnect; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_CLIENT_DISCONNECT ({})", - result.err().unwrap() - ); - } - } - VOICE_CLIENT_CONNECT_FLAGS => { - trace!("VGW: Received Client Connect Flags"); - - let event = &mut self.events.lock().await.client_connect_flags; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_CLIENT_CONNECT_FLAGS ({})", - result.err().unwrap() - ); - } - } - VOICE_CLIENT_CONNECT_PLATFORM => { - trace!("VGW: Received Client Connect Platform"); - - let event = &mut self.events.lock().await.client_connect_platform; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_CLIENT_CONNECT_PLATFORM ({})", - result.err().unwrap() - ); - } - } - VOICE_MEDIA_SINK_WANTS => { - trace!("VGW: Received Media Sink Wants"); - - let event = &mut self.events.lock().await.media_sink_wants; - let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; - if result.is_err() { - warn!( - "Failed to parse VOICE_MEDIA_SINK_WANTS ({})", - result.err().unwrap() - ); - } - } - // We received a heartbeat from the server - // "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately." - VOICE_HEARTBEAT => { - trace!("VGW: Received Heartbeat // Heartbeat Request"); - - // Tell the heartbeat handler it should send a heartbeat right away - let heartbeat_communication = VoiceHeartbeatThreadCommunication { - updated_nonce: None, - op_code: Some(VOICE_HEARTBEAT), - }; - - self.heartbeat_handler - .send - .send(heartbeat_communication) - .await - .unwrap(); - } - VOICE_HEARTBEAT_ACK => { - trace!("VGW: Received Heartbeat ACK"); - - // Tell the heartbeat handler we received an ack - - let heartbeat_communication = VoiceHeartbeatThreadCommunication { - updated_nonce: None, - op_code: Some(VOICE_HEARTBEAT_ACK), - }; - - self.heartbeat_handler - .send - .send(heartbeat_communication) - .await - .unwrap(); - } - VOICE_IDENTIFY | VOICE_SELECT_PROTOCOL | VOICE_RESUME => { - info!( - "VGW: Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation and is likely not the fault of chorus.", - gateway_payload.op_code - ); - } - _ => { - warn!("VGW: Received unrecognized voice gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code); - } - } - } -} - -/// Handles sending heartbeats to the voice gateway in another thread -#[allow(dead_code)] // FIXME: Remove this, once all fields of VoiceHeartbeatHandler are used -#[derive(Debug)] -struct VoiceHeartbeatHandler { - /// The heartbeat interval in milliseconds - pub heartbeat_interval: Duration, - /// The send channel for the heartbeat thread - pub send: Sender, - /// The handle of the thread - handle: JoinHandle<()>, -} - -impl VoiceHeartbeatHandler { - pub fn new( - heartbeat_interval: Duration, - starting_nonce: u64, - websocket_tx: Arc< - Mutex< - SplitSink< - WebSocketStream>, - tokio_tungstenite::tungstenite::Message, - >, - >, - >, - kill_rc: tokio::sync::broadcast::Receiver<()>, - ) -> Self { - let (send, receive) = tokio::sync::mpsc::channel(32); - let kill_receive = kill_rc.resubscribe(); - - let handle: JoinHandle<()> = tokio::spawn(async move { - Self::heartbeat_task( - websocket_tx, - heartbeat_interval, - starting_nonce, - receive, - kill_receive, - ) - .await; - }); - - Self { - heartbeat_interval, - send, - handle, - } - } - - /// The main heartbeat task; - /// - /// Can be killed by the kill broadcast; - /// If the websocket is closed, will die out next time it tries to send a heartbeat; - pub async fn heartbeat_task( - websocket_tx: Arc< - Mutex< - SplitSink< - WebSocketStream>, - tokio_tungstenite::tungstenite::Message, - >, - >, - >, - heartbeat_interval: Duration, - starting_nonce: u64, - mut receive: tokio::sync::mpsc::Receiver, - mut kill_receive: tokio::sync::broadcast::Receiver<()>, - ) { - let mut last_heartbeat_timestamp: Instant = time::Instant::now(); - let mut last_heartbeat_acknowledged = true; - let mut nonce: u64 = starting_nonce; - - loop { - if kill_receive.try_recv().is_ok() { - trace!("VGW: Closing heartbeat task"); - break; - } - - let timeout = if last_heartbeat_acknowledged { - heartbeat_interval - } else { - // If the server hasn't acknowledged our heartbeat we should resend it - Duration::from_millis(HEARTBEAT_ACK_TIMEOUT) - }; - - let mut should_send = false; - - tokio::select! { - () = sleep_until(last_heartbeat_timestamp + timeout) => { - should_send = true; - } - Some(communication) = receive.recv() => { - // If we received a nonce update, use that nonce now - if communication.updated_nonce.is_some() { - nonce = communication.updated_nonce.unwrap(); - } - - if let Some(op_code) = communication.op_code { - match op_code { - VOICE_HEARTBEAT => { - // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately - should_send = true; - } - VOICE_HEARTBEAT_ACK => { - // The server received our heartbeat - last_heartbeat_acknowledged = true; - } - _ => {} - } - } - } - } - - if should_send { - trace!("VGW: Sending Heartbeat.."); - - let heartbeat = VoiceGatewaySendPayload { - op_code: VOICE_HEARTBEAT, - data: nonce.into(), - }; - - let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); - - let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); - - let send_result = websocket_tx.lock().await.send(msg).await; - if send_result.is_err() { - // We couldn't send, the websocket is broken - warn!("VGW: Couldnt send heartbeat, websocket seems broken"); - break; - } - - last_heartbeat_timestamp = time::Instant::now(); - last_heartbeat_acknowledged = false; - } - } - } -} - -/// Used for communications between the voice heartbeat and voice gateway thread. -/// Either signifies a nonce update, a heartbeat ACK or a Heartbeat request by the server -#[derive(Clone, Copy, Debug)] -struct VoiceHeartbeatThreadCommunication { - /// The opcode for the communication we received, if relevant - op_code: Option, - /// The new nonce to use, if any - updated_nonce: Option, -} - -pub mod voice_events { - use crate::{ - errors::VoiceGatewayError, - gateway::GatewayEvent, - types::{ - SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion, - VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection, - VoiceMediaSinkWants, VoiceReady, - }, - }; - - #[derive(Default, Debug)] - pub struct VoiceEvents { - pub voice_ready: GatewayEvent, - pub backend_version: GatewayEvent, - pub session_description: GatewayEvent, - pub session_update: GatewayEvent, - pub speaking: GatewayEvent, - pub ssrc_definition: GatewayEvent, - pub client_disconnect: GatewayEvent, - pub client_connect_flags: GatewayEvent, - pub client_connect_platform: GatewayEvent, - pub media_sink_wants: GatewayEvent, - pub error: GatewayEvent, - } -} diff --git a/src/voice/gateway/backends/mod.rs b/src/voice/gateway/backends/mod.rs new file mode 100644 index 0000000..edb5dc9 --- /dev/null +++ b/src/voice/gateway/backends/mod.rs @@ -0,0 +1,23 @@ +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub mod tungstenite; +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub use tungstenite::*; + +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub mod wasm; +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub use wasm::*; + +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub type Sink = tungstenite::TungsteniteSink; +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub type Stream = tungstenite::TungsteniteStream; +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub type WebSocketBackend = tungstenite::TungsteniteBackend; + +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub type Sink = wasm::WasmSink; +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub type Stream = wasm::WasmStream; +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub type WebSocketBackend = wasm::WasmBackend; diff --git a/src/voice/gateway/backends/tungstenite.rs b/src/voice/gateway/backends/tungstenite.rs new file mode 100644 index 0000000..090fdb9 --- /dev/null +++ b/src/voice/gateway/backends/tungstenite.rs @@ -0,0 +1,65 @@ +use futures_util::{ + stream::{SplitSink, SplitStream}, + StreamExt, +}; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + connect_async_tls_with_config, tungstenite, Connector, MaybeTlsStream, WebSocketStream, +}; + +use crate::{errors::VoiceGatewayError, voice::gateway::VoiceGatewayMesssage}; + +#[derive(Debug, Clone)] +pub struct TungsteniteBackend; + +// These could be made into inherent associated types when that's stabilized +pub type TungsteniteSink = + SplitSink>, tungstenite::Message>; +pub type TungsteniteStream = SplitStream>>; + +impl TungsteniteBackend { + pub async fn connect( + websocket_url: &str, + ) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> { + let mut roots = rustls::RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") + { + roots.add(&rustls::Certificate(cert.0)).unwrap(); + } + let (websocket_stream, _) = match connect_async_tls_with_config( + websocket_url, + None, + false, + Some(Connector::Rustls( + rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth() + .into(), + )), + ) + .await + { + Ok(websocket_stream) => websocket_stream, + Err(e) => { + return Err(VoiceGatewayError::CannotConnect { + error: e.to_string(), + }) + } + }; + + Ok(websocket_stream.split()) + } +} + +impl From for tungstenite::Message { + fn from(message: VoiceGatewayMesssage) -> Self { + Self::Text(message.0) + } +} + +impl From for VoiceGatewayMesssage { + fn from(value: tungstenite::Message) -> Self { + Self(value.to_string()) + } +} diff --git a/src/voice/gateway/backends/wasm.rs b/src/voice/gateway/backends/wasm.rs new file mode 100644 index 0000000..588c882 --- /dev/null +++ b/src/voice/gateway/backends/wasm.rs @@ -0,0 +1,48 @@ +use futures_util::{ + stream::{SplitSink, SplitStream}, + StreamExt, +}; + +use ws_stream_wasm::*; + +use crate::errors::VoiceGatewayError; +use crate::voice::gateway::VoiceGatewayMessage; + +#[derive(Debug, Clone)] +pub struct WasmBackend; + +// These could be made into inherent associated types when that's stabilized +pub type WasmSink = SplitSink; +pub type WasmStream = SplitStream; + +impl WasmBackend { + pub async fn connect(websocket_url: &str) -> Result<(WasmSink, WasmStream), VoiceGatewayError> { + let (_, websocket_stream) = match WsMeta::connect(websocket_url, None).await { + Ok(stream) => Ok(stream), + Err(e) => Err(VoiceGatewayError::CannotConnect { + error: e.to_string(), + }), + }?; + + Ok(websocket_stream.split()) + } +} + +impl From for WsMessage { + fn from(message: VoiceGatewayMessage) -> Self { + Self::Text(message.0) + } +} + +impl From for VoiceGatewayMessage { + fn from(value: WsMessage) -> Self { + match value { + WsMessage::Text(text) => Self(text), + WsMessage::Binary(bin) => { + let mut text = String::new(); + let _ = bin.iter().map(|v| text.push_str(&v.to_string())); + Self(text) + } + } + } +} diff --git a/src/voice/gateway/events.rs b/src/voice/gateway/events.rs new file mode 100644 index 0000000..a0f018c --- /dev/null +++ b/src/voice/gateway/events.rs @@ -0,0 +1,24 @@ +use crate::{ + errors::VoiceGatewayError, + gateway::GatewayEvent, + types::{ + SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion, + VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection, + VoiceMediaSinkWants, VoiceReady, + }, +}; + +#[derive(Default, Debug)] +pub struct VoiceEvents { + pub voice_ready: GatewayEvent, + pub backend_version: GatewayEvent, + pub session_description: GatewayEvent, + pub session_update: GatewayEvent, + pub speaking: GatewayEvent, + pub ssrc_definition: GatewayEvent, + pub client_disconnect: GatewayEvent, + pub client_connect_flags: GatewayEvent, + pub client_connect_platform: GatewayEvent, + pub media_sink_wants: GatewayEvent, + pub error: GatewayEvent, +} diff --git a/src/voice/gateway/gateway.rs b/src/voice/gateway/gateway.rs new file mode 100644 index 0000000..e6960e9 --- /dev/null +++ b/src/voice/gateway/gateway.rs @@ -0,0 +1,335 @@ +use std::{sync::Arc, time::Duration}; + +use log::*; + +use tokio::sync::Mutex; + +use futures_util::SinkExt; +use futures_util::StreamExt; + +use crate::{ + errors::VoiceGatewayError, + gateway::GatewayEvent, + types::{ + VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION, + VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT, + VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK, VOICE_HELLO, VOICE_IDENTIFY, VOICE_MEDIA_SINK_WANTS, + VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, VOICE_SESSION_DESCRIPTION, + VOICE_SESSION_UPDATE, VOICE_SPEAKING, VOICE_SSRC_DEFINITION, + }, + voice::gateway::{ + heartbeat::VoiceHeartbeatThreadCommunication, VoiceGatewayMesssage, WebSocketBackend, + }, +}; + +use super::{ + events::VoiceEvents, heartbeat::VoiceHeartbeatHandler, Sink, Stream, VoiceGatewayHandle, +}; + +#[derive(Debug)] +pub struct VoiceGateway { + events: Arc>, + heartbeat_handler: VoiceHeartbeatHandler, + websocket_send: Arc>, + websocket_receive: Stream, + kill_send: tokio::sync::broadcast::Sender<()>, +} + +impl VoiceGateway { + #[allow(clippy::new_ret_no_self)] + pub async fn spawn(websocket_url: String) -> Result { + // Append the needed things to the websocket url + let processed_url = format!("wss://{}/?v=7", websocket_url); + trace!("Created voice socket url: {}", processed_url.clone()); + + let (websocket_send, mut websocket_receive) = + WebSocketBackend::connect(&websocket_url).await?; + + let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); + + // Create a shared broadcast channel for killing all gateway tasks + let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16); + + // Wait for the first hello and then spawn both tasks so we avoid nested tasks + // This automatically spawns the heartbeat task, but from the main thread + #[cfg(not(target_arch = "wasm32"))] + let msg: VoiceGatewayMesssage = websocket_receive.next().await.unwrap().unwrap().into(); + #[cfg(target_arch = "wasm32")] + let msg: VoiceGatewayMessage = websocket_receive.next().await.unwrap().into(); + let gateway_payload: VoiceGatewayReceivePayload = serde_json::from_str(&msg.0).unwrap(); + + if gateway_payload.op_code != VOICE_HELLO { + return Err(VoiceGatewayError::NonHelloOnInitiate { + opcode: gateway_payload.op_code, + }); + } + + info!("VGW: Received Hello"); + + // The hello data for voice gateways is in float milliseconds, so we convert it to f64 seconds + let gateway_hello: VoiceHelloData = + serde_json::from_str(gateway_payload.data.get()).unwrap(); + let heartbeat_interval_seconds: f64 = gateway_hello.heartbeat_interval / 1000.0; + + let voice_events = VoiceEvents::default(); + let shared_events = Arc::new(Mutex::new(voice_events)); + + let mut gateway = VoiceGateway { + events: shared_events.clone(), + heartbeat_handler: VoiceHeartbeatHandler::new( + Duration::from_secs_f64(heartbeat_interval_seconds), + 1, // to:do actually compute nonce + shared_websocket_send.clone(), + kill_send.subscribe(), + ), + websocket_send: shared_websocket_send.clone(), + websocket_receive, + kill_send: kill_send.clone(), + }; + + // Now we can continuously check for messages in a different task, since we aren't going to receive another hello + #[cfg(not(target_arch = "wasm32"))] + tokio::task::spawn(async move { + gateway.gateway_listen_task().await; + }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { + gateway.gateway_listen_task().await; + }); + + Ok(VoiceGatewayHandle { + url: websocket_url.clone(), + events: shared_events, + websocket_send: shared_websocket_send.clone(), + kill_send: kill_send.clone(), + }) + } + + /// The main gateway listener task; + /// + /// Can only be stopped by closing the websocket, cannot be made to listen for kill + pub async fn gateway_listen_task(&mut self) { + loop { + let msg = self.websocket_receive.next().await; + + // PRETTYFYME: Remove inline conditional compiling + #[cfg(not(target_arch = "wasm32"))] + if let Some(Ok(message)) = msg { + self.handle_message(message.into()).await; + continue; + } + #[cfg(target_arch = "wasm32")] + if let Some(message) = msg { + self.handle_message(message.into()).await; + continue; + } + + // We couldn't receive the next message or it was an error, something is wrong with the websocket, close + warn!("VGW: Websocket is broken, stopping gateway"); + break; + } + } + + /// Closes the websocket connection and stops all tasks + async fn close(&mut self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } + + /// Deserializes and updates a dispatched event, when we already know its type; + /// (Called for every event in handle_message) + async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( + data: &'a str, + event: &mut GatewayEvent, + ) -> Result<(), serde_json::Error> { + let data_deserialize_result: Result = serde_json::from_str(data); + + if data_deserialize_result.is_err() { + return Err(data_deserialize_result.err().unwrap()); + } + + event.notify(data_deserialize_result.unwrap()).await; + Ok(()) + } + + /// This handles a message as a websocket event and updates its events along with the events' observers + pub async fn handle_message(&mut self, msg: VoiceGatewayMesssage) { + if msg.0.is_empty() { + return; + } + + let Ok(gateway_payload) = msg.payload() else { + if let Some(error) = msg.error() { + warn!("GW: Received error {:?}, connection will close..", error); + self.close().await; + self.events.lock().await.error.notify(error).await; + } else { + warn!( + "Message unrecognised: {:?}, please open an issue on the chorus github", + msg.0 + ); + } + return; + }; + + // See + match gateway_payload.op_code { + VOICE_READY => { + trace!("VGW: Received READY!"); + + let event = &mut self.events.lock().await.voice_ready; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!("Failed to parse VOICE_READY ({})", result.err().unwrap()); + } + } + VOICE_BACKEND_VERSION => { + trace!("VGW: Received Backend Version"); + + let event = &mut self.events.lock().await.backend_version; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_BACKEND_VERSION ({})", + result.err().unwrap() + ); + } + } + VOICE_SESSION_DESCRIPTION => { + trace!("VGW: Received Session Description"); + + let event = &mut self.events.lock().await.session_description; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_SESSION_DESCRIPTION ({})", + result.err().unwrap() + ); + } + } + VOICE_SESSION_UPDATE => { + trace!("VGW: Received Session Update"); + + let event = &mut self.events.lock().await.session_update; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_SESSION_UPDATE ({})", + result.err().unwrap() + ); + } + } + VOICE_SPEAKING => { + trace!("VGW: Received Speaking"); + + let event = &mut self.events.lock().await.speaking; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!("Failed to parse VOICE_SPEAKING ({})", result.err().unwrap()); + } + } + VOICE_SSRC_DEFINITION => { + trace!("VGW: Received Ssrc Definition"); + + let event = &mut self.events.lock().await.ssrc_definition; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_SSRC_DEFINITION ({})", + result.err().unwrap() + ); + } + } + VOICE_CLIENT_DISCONNECT => { + trace!("VGW: Received Client Disconnect"); + + let event = &mut self.events.lock().await.client_disconnect; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_CLIENT_DISCONNECT ({})", + result.err().unwrap() + ); + } + } + VOICE_CLIENT_CONNECT_FLAGS => { + trace!("VGW: Received Client Connect Flags"); + + let event = &mut self.events.lock().await.client_connect_flags; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_CLIENT_CONNECT_FLAGS ({})", + result.err().unwrap() + ); + } + } + VOICE_CLIENT_CONNECT_PLATFORM => { + trace!("VGW: Received Client Connect Platform"); + + let event = &mut self.events.lock().await.client_connect_platform; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_CLIENT_CONNECT_PLATFORM ({})", + result.err().unwrap() + ); + } + } + VOICE_MEDIA_SINK_WANTS => { + trace!("VGW: Received Media Sink Wants"); + + let event = &mut self.events.lock().await.media_sink_wants; + let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await; + if result.is_err() { + warn!( + "Failed to parse VOICE_MEDIA_SINK_WANTS ({})", + result.err().unwrap() + ); + } + } + // We received a heartbeat from the server + // "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately." + VOICE_HEARTBEAT => { + trace!("VGW: Received Heartbeat // Heartbeat Request"); + + // Tell the heartbeat handler it should send a heartbeat right away + let heartbeat_communication = VoiceHeartbeatThreadCommunication { + updated_nonce: None, + op_code: Some(VOICE_HEARTBEAT), + }; + + self.heartbeat_handler + .send + .send(heartbeat_communication) + .await + .unwrap(); + } + VOICE_HEARTBEAT_ACK => { + trace!("VGW: Received Heartbeat ACK"); + + // Tell the heartbeat handler we received an ack + + let heartbeat_communication = VoiceHeartbeatThreadCommunication { + updated_nonce: None, + op_code: Some(VOICE_HEARTBEAT_ACK), + }; + + self.heartbeat_handler + .send + .send(heartbeat_communication) + .await + .unwrap(); + } + VOICE_IDENTIFY | VOICE_SELECT_PROTOCOL | VOICE_RESUME => { + info!( + "VGW: Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation and is likely not the fault of chorus.", + gateway_payload.op_code + ); + } + _ => { + warn!("VGW: Received unrecognized voice gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code); + } + } + } +} diff --git a/src/voice/gateway/handle.rs b/src/voice/gateway/handle.rs new file mode 100644 index 0000000..d24adbf --- /dev/null +++ b/src/voice/gateway/handle.rs @@ -0,0 +1,101 @@ +use std::sync::Arc; + +use log::*; + +use futures_util::SinkExt; + +use serde_json::json; +use tokio::sync::Mutex; + +use crate::types::{ + SelectProtocol, Speaking, SsrcDefinition, VoiceGatewaySendPayload, VoiceIdentify, + VOICE_BACKEND_VERSION, VOICE_IDENTIFY, VOICE_SELECT_PROTOCOL, VOICE_SPEAKING, + VOICE_SSRC_DEFINITION, +}; + +use super::{events::VoiceEvents, Sink, VoiceGatewayMesssage}; + +/// Represents a handle to a Voice Gateway connection. +/// Using this handle you can send Gateway Events directly. +#[derive(Debug, Clone)] +pub struct VoiceGatewayHandle { + pub url: String, + pub events: Arc>, + pub websocket_send: Arc>, + /// Tells gateway tasks to close + pub(super) kill_send: tokio::sync::broadcast::Sender<()>, +} + +impl VoiceGatewayHandle { + /// Sends json to the gateway with an opcode + async fn send_json(&self, op_code: u8, to_send: serde_json::Value) { + let gateway_payload = VoiceGatewaySendPayload { + op_code, + data: to_send, + }; + + let payload_json = serde_json::to_string(&gateway_payload).unwrap(); + let message = VoiceGatewayMesssage(payload_json); + + self.websocket_send + .lock() + .await + .send(message.into()) + .await + .unwrap(); + } + + /// Sends a voice identify event to the gateway + pub async fn send_identify(&self, to_send: VoiceIdentify) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + trace!("VGW: Sending Identify.."); + + self.send_json(VOICE_IDENTIFY, to_send_value).await; + } + + /// Sends a select protocol event to the gateway + pub async fn send_select_protocol(&self, to_send: SelectProtocol) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + trace!("VGW: Sending Select Protocol"); + + self.send_json(VOICE_SELECT_PROTOCOL, to_send_value).await; + } + + /// Sends a speaking event to the gateway + pub async fn send_speaking(&self, to_send: Speaking) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + trace!("VGW: Sending Speaking"); + + self.send_json(VOICE_SPEAKING, to_send_value).await; + } + + /// Sends an ssrc definition event + pub async fn send_ssrc_definition(&self, to_send: SsrcDefinition) { + let to_send_value = serde_json::to_value(&to_send).unwrap(); + + trace!("VGW: Sending SsrcDefinition"); + + self.send_json(VOICE_SSRC_DEFINITION, to_send_value).await; + } + + /// Sends a voice backend version request to the gateway + pub async fn send_voice_backend_version_request(&self) { + let data_empty_object = json!("{}"); + + trace!("VGW: Requesting voice backend version"); + + self.send_json(VOICE_BACKEND_VERSION, data_empty_object) + .await; + } + + /// Closes the websocket connection and stops all gateway tasks; + /// + /// Esentially pulls the plug on the voice gateway, leaving it possible to resume; + pub async fn close(&self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } +} diff --git a/src/voice/gateway/heartbeat.rs b/src/voice/gateway/heartbeat.rs new file mode 100644 index 0000000..afd7033 --- /dev/null +++ b/src/voice/gateway/heartbeat.rs @@ -0,0 +1,160 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use futures_util::SinkExt; +use log::*; +use safina_timer::sleep_until; +use tokio::sync::{mpsc::Sender, Mutex}; + +use crate::{ + gateway::heartbeat::HEARTBEAT_ACK_TIMEOUT, + types::{VoiceGatewaySendPayload, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK}, + voice::gateway::VoiceGatewayMesssage, +}; + +use super::Sink; + +/// Handles sending heartbeats to the voice gateway in another thread +#[allow(dead_code)] // FIXME: Remove this, once all fields of VoiceHeartbeatHandler are used +#[derive(Debug)] +pub(super) struct VoiceHeartbeatHandler { + /// The heartbeat interval in milliseconds + pub heartbeat_interval: Duration, + /// The send channel for the heartbeat thread + pub send: Sender, +} + +impl VoiceHeartbeatHandler { + pub fn new( + heartbeat_interval: Duration, + starting_nonce: u64, + websocket_tx: Arc>, + kill_rc: tokio::sync::broadcast::Receiver<()>, + ) -> Self { + let (send, receive) = tokio::sync::mpsc::channel(32); + let kill_receive = kill_rc.resubscribe(); + + #[cfg(not(target_arch = "wasm32"))] + tokio::task::spawn(async move { + Self::heartbeat_task( + websocket_tx, + heartbeat_interval, + starting_nonce, + receive, + kill_receive, + ) + .await; + }); + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(async move { + Self::heartbeat_task( + websocket_tx, + heartbeat_interval, + starting_nonce, + receive, + kill_receive, + ) + .await; + }); + + Self { + heartbeat_interval, + send, + } + } + + /// The main heartbeat task; + /// + /// Can be killed by the kill broadcast; + /// If the websocket is closed, will die out next time it tries to send a heartbeat; + pub async fn heartbeat_task( + websocket_tx: Arc>, + heartbeat_interval: Duration, + starting_nonce: u64, + mut receive: tokio::sync::mpsc::Receiver, + mut kill_receive: tokio::sync::broadcast::Receiver<()>, + ) { + let mut last_heartbeat_timestamp: Instant = Instant::now(); + let mut last_heartbeat_acknowledged = true; + let mut nonce: u64 = starting_nonce; + + safina_timer::start_timer_thread(); + + loop { + if kill_receive.try_recv().is_ok() { + trace!("VGW: Closing heartbeat task"); + break; + } + + let timeout = if last_heartbeat_acknowledged { + heartbeat_interval + } else { + // If the server hasn't acknowledged our heartbeat we should resend it + Duration::from_millis(HEARTBEAT_ACK_TIMEOUT) + }; + + let mut should_send = false; + + tokio::select! { + () = sleep_until(last_heartbeat_timestamp + timeout) => { + should_send = true; + } + Some(communication) = receive.recv() => { + // If we received a nonce update, use that nonce now + if communication.updated_nonce.is_some() { + nonce = communication.updated_nonce.unwrap(); + } + + if let Some(op_code) = communication.op_code { + match op_code { + VOICE_HEARTBEAT => { + // As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately + should_send = true; + } + VOICE_HEARTBEAT_ACK => { + // The server received our heartbeat + last_heartbeat_acknowledged = true; + } + _ => {} + } + } + } + } + + if should_send { + trace!("VGW: Sending Heartbeat.."); + + let heartbeat = VoiceGatewaySendPayload { + op_code: VOICE_HEARTBEAT, + data: nonce.into(), + }; + + let heartbeat_json = serde_json::to_string(&heartbeat).unwrap(); + + let msg = VoiceGatewayMesssage(heartbeat_json); + + let send_result = websocket_tx.lock().await.send(msg.into()).await; + if send_result.is_err() { + // We couldn't send, the websocket is broken + warn!("VGW: Couldnt send heartbeat, websocket seems broken"); + break; + } + + last_heartbeat_timestamp = Instant::now(); + last_heartbeat_acknowledged = false; + } + } + } +} + +/// Used for communications between the voice heartbeat and voice gateway thread. +/// Either signifies a nonce update, a heartbeat ACK or a Heartbeat request by the server +#[derive(Clone, Copy, Debug)] +pub(super) struct VoiceHeartbeatThreadCommunication { + /// The opcode for the communication we received, if relevant + pub(super) op_code: Option, + /// The new nonce to use, if any + pub(super) updated_nonce: Option, +} diff --git a/src/voice/gateway/message.rs b/src/voice/gateway/message.rs new file mode 100644 index 0000000..ff723c3 --- /dev/null +++ b/src/voice/gateway/message.rs @@ -0,0 +1,39 @@ +use crate::{errors::VoiceGatewayError, types::VoiceGatewayReceivePayload}; + +/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError]. +/// This struct is used internally when handling messages. +#[derive(Clone, Debug)] +pub struct VoiceGatewayMesssage(pub String); + +impl VoiceGatewayMesssage { + /// Parses the message as an error; + /// Returns the error if succesfully parsed, None if the message isn't an error + pub fn error(&self) -> Option { + // Some error strings have dots on the end, which we don't care about + let processed_content = self.0.to_lowercase().replace('.', ""); + + match processed_content.as_str() { + "unknown opcode" | "4001" => Some(VoiceGatewayError::UnknownOpcode), + "decode error" | "failed to decode payload" | "4002" => { + Some(VoiceGatewayError::FailedToDecodePayload) + } + "not authenticated" | "4003" => Some(VoiceGatewayError::NotAuthenticated), + "authentication failed" | "4004" => Some(VoiceGatewayError::AuthenticationFailed), + "already authenticated" | "4005" => Some(VoiceGatewayError::AlreadyAuthenticated), + "session is no longer valid" | "4006" => Some(VoiceGatewayError::SessionNoLongerValid), + "session timeout" | "4009" => Some(VoiceGatewayError::SessionTimeout), + "server not found" | "4011" => Some(VoiceGatewayError::ServerNotFound), + "unknown protocol" | "4012" => Some(VoiceGatewayError::UnknownProtocol), + "disconnected" | "4014" => Some(VoiceGatewayError::Disconnected), + "voice server crashed" | "4015" => Some(VoiceGatewayError::VoiceServerCrashed), + "unknown encryption mode" | "4016" => Some(VoiceGatewayError::UnknownEncryptionMode), + _ => None, + } + } + + /// Parses the message as a payload; + /// Returns a result of deserializing + pub fn payload(&self) -> Result { + return serde_json::from_str(&self.0); + } +} diff --git a/src/voice/gateway/mod.rs b/src/voice/gateway/mod.rs new file mode 100644 index 0000000..4819663 --- /dev/null +++ b/src/voice/gateway/mod.rs @@ -0,0 +1,11 @@ +pub mod backends; +pub mod events; +pub mod gateway; +pub mod handle; +pub mod heartbeat; +pub mod message; + +pub use backends::*; +pub use gateway::*; +pub use handle::*; +pub use message::*; diff --git a/src/voice/udp.rs b/src/voice/udp.rs index 9b6f9cf..9fbfd79 100644 --- a/src/voice/udp.rs +++ b/src/voice/udp.rs @@ -344,7 +344,7 @@ impl UdpHandler { } Demuxed::Rtcp(rtcp) => { trace!("VUDP: Parsed packet as rtcp!"); - + let rtcp_data = match rtcp { discortp::rtcp::RtcpPacket::KnownType(knowntype) => { discortp::rtcp::Rtcp::KnownType(knowntype)