Modernise voice gateway

This commit is contained in:
kozabrada123 2023-10-14 09:58:26 +02:00
parent 68b6ff4ca7
commit e4f0a3840a
2 changed files with 22 additions and 17 deletions

View File

@ -114,3 +114,5 @@ custom_error! {
// Other misc errors // Other misc errors
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}", UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
} }
impl WebSocketEvent for VoiceGatewayError {}

View File

@ -24,6 +24,8 @@ use crate::types::{
VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING, VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING,
}; };
use self::voice_events::VoiceEvents;
/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError]. /// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError].
/// This struct is used internally when handling messages. /// This struct is used internally when handling messages.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -94,10 +96,10 @@ impl VoiceGatewayMesssage {
/// Represents a handle to a Voice Gateway connection. /// Represents a handle to a Voice Gateway connection.
/// Using this handle you can send Gateway Events directly. /// Using this handle you can send Gateway Events directly.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct VoiceGatewayHandle { pub struct VoiceGatewayHandle {
pub url: String, pub url: String,
pub events: Arc<Mutex<voice_events::VoiceEvents>>, pub events: Arc<Mutex<VoiceEvents>>,
pub websocket_send: Arc< pub websocket_send: Arc<
Mutex< Mutex<
SplitSink< SplitSink<
@ -106,7 +108,6 @@ pub struct VoiceGatewayHandle {
>, >,
>, >,
>, >,
pub handle: JoinHandle<()>,
/// Tells gateway tasks to close /// Tells gateway tasks to close
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
} }
@ -179,7 +180,7 @@ impl VoiceGatewayHandle {
#[derive(Debug)] #[derive(Debug)]
pub struct VoiceGateway { pub struct VoiceGateway {
events: Arc<Mutex<voice_events::VoiceEvents>>, events: Arc<Mutex<VoiceEvents>>,
heartbeat_handler: VoiceHeartbeatHandler, heartbeat_handler: VoiceHeartbeatHandler,
websocket_send: Arc< websocket_send: Arc<
Mutex< Mutex<
@ -191,6 +192,7 @@ pub struct VoiceGateway {
>, >,
websocket_receive: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, websocket_receive: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
url: String,
} }
impl VoiceGateway { impl VoiceGateway {
@ -242,7 +244,7 @@ impl VoiceGateway {
let gateway_hello: types::HelloData = let gateway_hello: types::HelloData =
serde_json::from_str(gateway_payload.data.get()).unwrap(); serde_json::from_str(gateway_payload.data.get()).unwrap();
let voice_events = voice_events::VoiceEvents::default(); let voice_events = VoiceEvents::default();
let shared_events = Arc::new(Mutex::new(voice_events)); let shared_events = Arc::new(Mutex::new(voice_events));
let mut gateway = VoiceGateway { let mut gateway = VoiceGateway {
@ -256,10 +258,11 @@ impl VoiceGateway {
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
websocket_receive, websocket_receive,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
url: websocket_url.clone(),
}; };
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello // Now we can continuously check for messages in a different task, since we aren't going to receive another hello
let handle: JoinHandle<()> = tokio::spawn(async move { tokio::spawn(async move {
gateway.gateway_listen_task().await; gateway.gateway_listen_task().await;
}); });
@ -267,7 +270,6 @@ impl VoiceGateway {
url: websocket_url.clone(), url: websocket_url.clone(),
events: shared_events, events: shared_events,
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
handle,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
}) })
} }
@ -327,18 +329,21 @@ impl VoiceGateway {
return; return;
} }
// To:do: handle errors in a good way, maybe observers like events?
if msg.is_error() { if msg.is_error() {
let error = msg.error().unwrap();
warn!("VGW: Received error, connection will close.."); warn!("VGW: Received error, connection will close..");
let _error = msg.error();
self.close().await; self.close().await;
self.events.lock().await.error.notify(error).await;
return; return;
} }
let gateway_payload = msg.payload().unwrap(); let gateway_payload = msg.payload().unwrap();
// See <https://discord.com/developers/docs/topics/voice-connections>
match gateway_payload.op_code { match gateway_payload.op_code {
VOICE_READY => { VOICE_READY => {
let event = &mut self.events.lock().await.voice_ready; let event = &mut self.events.lock().await.voice_ready;
@ -493,14 +498,11 @@ impl VoiceHeartbeatHandler {
() = sleep_until(last_heartbeat_timestamp + timeout) => { () = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true; should_send = true;
} }
Some(communication) = receive.recv() => { Some(communication) = receive.recv() => {
// If we received a nonce update, use that nonce now // If we received a nonce update, use that nonce now
if communication.updated_nonce.is_some() { if communication.updated_nonce.is_some() {
nonce = communication.updated_nonce.unwrap(); nonce = communication.updated_nonce.unwrap();
} }
if let Some(op_code) = communication.op_code { if let Some(op_code) = communication.op_code {
match op_code { match op_code {
@ -554,7 +556,7 @@ struct VoiceHeartbeatThreadCommunication {
updated_nonce: Option<u64>, updated_nonce: Option<u64>,
} }
mod voice_events { pub mod voice_events {
use crate::types::{SessionDescription, VoiceReady}; use crate::types::{SessionDescription, VoiceReady};
use super::*; use super::*;
@ -563,5 +565,6 @@ mod voice_events {
pub struct VoiceEvents { pub struct VoiceEvents {
pub voice_ready: GatewayEvent<VoiceReady>, pub voice_ready: GatewayEvent<VoiceReady>,
pub session_description: GatewayEvent<SessionDescription>, pub session_description: GatewayEvent<SessionDescription>,
pub error: GatewayEvent<VoiceGatewayError>,
} }
} }