diff --git a/src/errors.rs b/src/errors.rs index 4a251c9..e8075c2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -114,3 +114,5 @@ custom_error! { // Other misc errors UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}", } + +impl WebSocketEvent for VoiceGatewayError {} diff --git a/src/voice.rs b/src/voice.rs index 14e883d..e6583b0 100644 --- a/src/voice.rs +++ b/src/voice.rs @@ -24,6 +24,8 @@ use crate::types::{ VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING, }; +use self::voice_events::VoiceEvents; + /// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError]. /// This struct is used internally when handling messages. #[derive(Clone, Debug)] @@ -94,10 +96,10 @@ impl VoiceGatewayMesssage { /// Represents a handle to a Voice Gateway connection. /// Using this handle you can send Gateway Events directly. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct VoiceGatewayHandle { pub url: String, - pub events: Arc>, + pub events: Arc>, pub websocket_send: Arc< Mutex< SplitSink< @@ -106,7 +108,6 @@ pub struct VoiceGatewayHandle { >, >, >, - pub handle: JoinHandle<()>, /// Tells gateway tasks to close kill_send: tokio::sync::broadcast::Sender<()>, } @@ -179,7 +180,7 @@ impl VoiceGatewayHandle { #[derive(Debug)] pub struct VoiceGateway { - events: Arc>, + events: Arc>, heartbeat_handler: VoiceHeartbeatHandler, websocket_send: Arc< Mutex< @@ -191,6 +192,7 @@ pub struct VoiceGateway { >, websocket_receive: SplitStream>>, kill_send: tokio::sync::broadcast::Sender<()>, + url: String, } impl VoiceGateway { @@ -242,7 +244,7 @@ impl VoiceGateway { let gateway_hello: types::HelloData = serde_json::from_str(gateway_payload.data.get()).unwrap(); - let voice_events = voice_events::VoiceEvents::default(); + let voice_events = VoiceEvents::default(); let shared_events = Arc::new(Mutex::new(voice_events)); let mut gateway = VoiceGateway { @@ -256,10 +258,11 @@ impl VoiceGateway { websocket_send: shared_websocket_send.clone(), websocket_receive, kill_send: kill_send.clone(), + url: websocket_url.clone(), }; // Now we can continuously check for messages in a different task, since we aren't going to receive another hello - let handle: JoinHandle<()> = tokio::spawn(async move { + tokio::spawn(async move { gateway.gateway_listen_task().await; }); @@ -267,7 +270,6 @@ impl VoiceGateway { url: websocket_url.clone(), events: shared_events, websocket_send: shared_websocket_send.clone(), - handle, kill_send: kill_send.clone(), }) } @@ -327,18 +329,21 @@ impl VoiceGateway { return; } - // To:do: handle errors in a good way, maybe observers like events? if msg.is_error() { + let error = msg.error().unwrap(); + warn!("VGW: Received error, connection will close.."); - let _error = msg.error(); - self.close().await; + + self.events.lock().await.error.notify(error).await; + return; } let gateway_payload = msg.payload().unwrap(); + // See match gateway_payload.op_code { VOICE_READY => { let event = &mut self.events.lock().await.voice_ready; @@ -493,14 +498,11 @@ impl VoiceHeartbeatHandler { () = sleep_until(last_heartbeat_timestamp + timeout) => { should_send = true; } - - Some(communication) = receive.recv() => { - // If we received a nonce update, use that nonce now - if communication.updated_nonce.is_some() { - nonce = communication.updated_nonce.unwrap(); - } + if communication.updated_nonce.is_some() { + nonce = communication.updated_nonce.unwrap(); + } if let Some(op_code) = communication.op_code { match op_code { @@ -554,7 +556,7 @@ struct VoiceHeartbeatThreadCommunication { updated_nonce: Option, } -mod voice_events { +pub mod voice_events { use crate::types::{SessionDescription, VoiceReady}; use super::*; @@ -563,5 +565,6 @@ mod voice_events { pub struct VoiceEvents { pub voice_ready: GatewayEvent, pub session_description: GatewayEvent, + pub error: GatewayEvent, } }