Modernise voice gateway
This commit is contained in:
parent
efc2fe2ba3
commit
f19a2eaf21
|
@ -114,3 +114,5 @@ custom_error! {
|
||||||
// Other misc errors
|
// Other misc errors
|
||||||
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
|
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl WebSocketEvent for VoiceGatewayError {}
|
||||||
|
|
37
src/voice.rs
37
src/voice.rs
|
@ -24,6 +24,8 @@ use crate::types::{
|
||||||
VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING,
|
VOICE_SESSION_DESCRIPTION, VOICE_SPEAKING,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use self::voice_events::VoiceEvents;
|
||||||
|
|
||||||
/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError].
|
/// Represents a messsage received from the webrtc socket. This will be either a [GatewayReceivePayload], containing webrtc events, or a [WebrtcError].
|
||||||
/// This struct is used internally when handling messages.
|
/// This struct is used internally when handling messages.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -94,10 +96,10 @@ impl VoiceGatewayMesssage {
|
||||||
|
|
||||||
/// Represents a handle to a Voice Gateway connection.
|
/// Represents a handle to a Voice Gateway connection.
|
||||||
/// Using this handle you can send Gateway Events directly.
|
/// Using this handle you can send Gateway Events directly.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct VoiceGatewayHandle {
|
pub struct VoiceGatewayHandle {
|
||||||
pub url: String,
|
pub url: String,
|
||||||
pub events: Arc<Mutex<voice_events::VoiceEvents>>,
|
pub events: Arc<Mutex<VoiceEvents>>,
|
||||||
pub websocket_send: Arc<
|
pub websocket_send: Arc<
|
||||||
Mutex<
|
Mutex<
|
||||||
SplitSink<
|
SplitSink<
|
||||||
|
@ -106,7 +108,6 @@ pub struct VoiceGatewayHandle {
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
pub handle: JoinHandle<()>,
|
|
||||||
/// Tells gateway tasks to close
|
/// Tells gateway tasks to close
|
||||||
kill_send: tokio::sync::broadcast::Sender<()>,
|
kill_send: tokio::sync::broadcast::Sender<()>,
|
||||||
}
|
}
|
||||||
|
@ -179,7 +180,7 @@ impl VoiceGatewayHandle {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct VoiceGateway {
|
pub struct VoiceGateway {
|
||||||
events: Arc<Mutex<voice_events::VoiceEvents>>,
|
events: Arc<Mutex<VoiceEvents>>,
|
||||||
heartbeat_handler: VoiceHeartbeatHandler,
|
heartbeat_handler: VoiceHeartbeatHandler,
|
||||||
websocket_send: Arc<
|
websocket_send: Arc<
|
||||||
Mutex<
|
Mutex<
|
||||||
|
@ -191,6 +192,7 @@ pub struct VoiceGateway {
|
||||||
>,
|
>,
|
||||||
websocket_receive: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
websocket_receive: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
||||||
kill_send: tokio::sync::broadcast::Sender<()>,
|
kill_send: tokio::sync::broadcast::Sender<()>,
|
||||||
|
url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VoiceGateway {
|
impl VoiceGateway {
|
||||||
|
@ -242,7 +244,7 @@ impl VoiceGateway {
|
||||||
let gateway_hello: types::HelloData =
|
let gateway_hello: types::HelloData =
|
||||||
serde_json::from_str(gateway_payload.data.get()).unwrap();
|
serde_json::from_str(gateway_payload.data.get()).unwrap();
|
||||||
|
|
||||||
let voice_events = voice_events::VoiceEvents::default();
|
let voice_events = VoiceEvents::default();
|
||||||
let shared_events = Arc::new(Mutex::new(voice_events));
|
let shared_events = Arc::new(Mutex::new(voice_events));
|
||||||
|
|
||||||
let mut gateway = VoiceGateway {
|
let mut gateway = VoiceGateway {
|
||||||
|
@ -256,10 +258,11 @@ impl VoiceGateway {
|
||||||
websocket_send: shared_websocket_send.clone(),
|
websocket_send: shared_websocket_send.clone(),
|
||||||
websocket_receive,
|
websocket_receive,
|
||||||
kill_send: kill_send.clone(),
|
kill_send: kill_send.clone(),
|
||||||
|
url: websocket_url.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello
|
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello
|
||||||
let handle: JoinHandle<()> = tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
gateway.gateway_listen_task().await;
|
gateway.gateway_listen_task().await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -267,7 +270,6 @@ impl VoiceGateway {
|
||||||
url: websocket_url.clone(),
|
url: websocket_url.clone(),
|
||||||
events: shared_events,
|
events: shared_events,
|
||||||
websocket_send: shared_websocket_send.clone(),
|
websocket_send: shared_websocket_send.clone(),
|
||||||
handle,
|
|
||||||
kill_send: kill_send.clone(),
|
kill_send: kill_send.clone(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -327,18 +329,21 @@ impl VoiceGateway {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// To:do: handle errors in a good way, maybe observers like events?
|
|
||||||
if msg.is_error() {
|
if msg.is_error() {
|
||||||
|
let error = msg.error().unwrap();
|
||||||
|
|
||||||
warn!("VGW: Received error, connection will close..");
|
warn!("VGW: Received error, connection will close..");
|
||||||
|
|
||||||
let _error = msg.error();
|
|
||||||
|
|
||||||
self.close().await;
|
self.close().await;
|
||||||
|
|
||||||
|
self.events.lock().await.error.notify(error).await;
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let gateway_payload = msg.payload().unwrap();
|
let gateway_payload = msg.payload().unwrap();
|
||||||
|
|
||||||
|
// See <https://discord.com/developers/docs/topics/voice-connections>
|
||||||
match gateway_payload.op_code {
|
match gateway_payload.op_code {
|
||||||
VOICE_READY => {
|
VOICE_READY => {
|
||||||
let event = &mut self.events.lock().await.voice_ready;
|
let event = &mut self.events.lock().await.voice_ready;
|
||||||
|
@ -493,14 +498,11 @@ impl VoiceHeartbeatHandler {
|
||||||
() = sleep_until(last_heartbeat_timestamp + timeout) => {
|
() = sleep_until(last_heartbeat_timestamp + timeout) => {
|
||||||
should_send = true;
|
should_send = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Some(communication) = receive.recv() => {
|
Some(communication) = receive.recv() => {
|
||||||
|
|
||||||
// If we received a nonce update, use that nonce now
|
// If we received a nonce update, use that nonce now
|
||||||
if communication.updated_nonce.is_some() {
|
if communication.updated_nonce.is_some() {
|
||||||
nonce = communication.updated_nonce.unwrap();
|
nonce = communication.updated_nonce.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(op_code) = communication.op_code {
|
if let Some(op_code) = communication.op_code {
|
||||||
match op_code {
|
match op_code {
|
||||||
|
@ -554,7 +556,7 @@ struct VoiceHeartbeatThreadCommunication {
|
||||||
updated_nonce: Option<u64>,
|
updated_nonce: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
mod voice_events {
|
pub mod voice_events {
|
||||||
use crate::types::{SessionDescription, VoiceReady};
|
use crate::types::{SessionDescription, VoiceReady};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -563,5 +565,6 @@ mod voice_events {
|
||||||
pub struct VoiceEvents {
|
pub struct VoiceEvents {
|
||||||
pub voice_ready: GatewayEvent<VoiceReady>,
|
pub voice_ready: GatewayEvent<VoiceReady>,
|
||||||
pub session_description: GatewayEvent<SessionDescription>,
|
pub session_description: GatewayEvent<SessionDescription>,
|
||||||
|
pub error: GatewayEvent<VoiceGatewayError>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue