Compare commits

..

No commits in common. "177dbaf0f6cf3bb8a997aa46214b1dd6d0cc1472" and "634e07b95c55ad0fe451b0b1b17379a965799bb7" have entirely different histories.

18 changed files with 50 additions and 84 deletions

View File

@ -40,7 +40,7 @@ use tokio::sync::{Mutex, RwLock};
extern crate chorus; extern crate chorus;
extern crate tokio; extern crate tokio;
/// Handles in between connections between the gateway and UDP modules /// Handles inbetween connections between the gateway and udp modules
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct VoiceHandler { pub struct VoiceHandler {
pub voice_gateway_connection: Arc<Mutex<Option<VoiceGatewayHandle>>>, pub voice_gateway_connection: Arc<Mutex<Option<VoiceGatewayHandle>>>,
@ -49,7 +49,7 @@ pub struct VoiceHandler {
} }
impl VoiceHandler { impl VoiceHandler {
/// Creates a new [VoiceHandler], only initializing the data /// Creates a new voicehandler, only initializing the data
pub fn new() -> VoiceHandler { pub fn new() -> VoiceHandler {
Self { Self {
data: Arc::new(RwLock::new(VoiceData::default())), data: Arc::new(RwLock::new(VoiceData::default())),
@ -66,7 +66,7 @@ impl Default for VoiceHandler {
} }
#[async_trait] #[async_trait]
// On [VoiceServerUpdate] we get our starting data and URL for the voice gateway server. // On [VoiceServerUpdate] we get our starting data and url for the voice gateway server.
impl Observer<VoiceServerUpdate> for VoiceHandler { impl Observer<VoiceServerUpdate> for VoiceHandler {
async fn update(&self, data: &VoiceServerUpdate) { async fn update(&self, data: &VoiceServerUpdate) {
let mut data_lock = self.data.write().await; let mut data_lock = self.data.write().await;
@ -121,7 +121,7 @@ impl Observer<VoiceServerUpdate> for VoiceHandler {
} }
#[async_trait] #[async_trait]
// On [VoiceReady] we get info for establishing a UDP connection, and we immediately need said UDP // On [VoiceReady] we get info for establishing a UDP connection, and we immedietly need said UDP
// connection for ip discovery. // connection for ip discovery.
impl Observer<VoiceReady> for VoiceHandler { impl Observer<VoiceReady> for VoiceHandler {
async fn update(&self, data: &VoiceReady) { async fn update(&self, data: &VoiceReady) {

View File

@ -140,7 +140,7 @@ custom_error! {
pub VoiceUdpError pub VoiceUdpError
// General errors // General errors
BrokenSocket{error: String} = "Could not write / read from UDP socket: {error}", BrokenSocket{error: String} = "Could not write / read from udp socket: {error}",
NoData = "We have not set received the necessary data to perform this operation.", NoData = "We have not set received the necessary data to perform this operation.",
// Encryption errors // Encryption errors
@ -151,8 +151,8 @@ custom_error! {
FailedNonceGeneration{error: String} = "Tried to generate nonce, but failed due to error: {error}.", FailedNonceGeneration{error: String} = "Tried to generate nonce, but failed due to error: {error}.",
// Errors when initiating a socket connection // Errors when initiating a socket connection
CannotBind{error: String} = "Cannot bind socket due to a UDP error: {error}", CannotBind{error: String} = "Cannot bind socket due to a udp error: {error}",
CannotConnect{error: String} = "Cannot connect due to a UDP error: {error}", CannotConnect{error: String} = "Cannot connect due to a udp error: {error}",
} }
impl WebSocketEvent for VoiceUdpError {} impl WebSocketEvent for VoiceUdpError {}

View File

@ -27,16 +27,8 @@ impl TungsteniteBackend {
websocket_url: &str, websocket_url: &str,
) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::GatewayError> { ) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::GatewayError> {
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
let certs = rustls_native_certs::load_native_certs(); for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
if let Err(e) = certs {
log::error!("Failed to load platform native certs! {:?}", e);
return Err(GatewayError::CannotConnect {
error: format!("{:?}", e),
});
}
for cert in certs.unwrap() {
roots.add(&rustls::Certificate(cert.0)).unwrap(); roots.add(&rustls::Certificate(cert.0)).unwrap();
} }
let (websocket_stream, _) = match connect_async_tls_with_config( let (websocket_stream, _) = match connect_async_tls_with_config(

View File

@ -26,7 +26,6 @@ pub struct Gateway {
websocket_send: Arc<Mutex<Sink>>, websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream, websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>, store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>,
url: String, url: String,
} }
@ -76,7 +75,6 @@ impl Gateway {
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
websocket_receive, websocket_receive,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
store: store.clone(), store: store.clone(),
url: websocket_url.clone(), url: websocket_url.clone(),
}; };
@ -101,19 +99,11 @@ impl Gateway {
} }
/// The main gateway listener task; /// The main gateway listener task;
///
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) { pub async fn gateway_listen_task(&mut self) {
loop { loop {
let msg; let msg = self.websocket_receive.next().await;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("GW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling // PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]

View File

@ -77,6 +77,11 @@ impl HeartbeatHandler {
let mut last_seq_number: Option<u64> = None; let mut last_seq_number: Option<u64> = None;
loop { loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged { let timeout = if last_heartbeat_acknowledged {
heartbeat_interval heartbeat_interval
} else { } else {
@ -110,10 +115,6 @@ impl HeartbeatHandler {
} }
} }
} }
Ok(_) = kill_receive.recv() => {
log::trace!("GW: Closing heartbeat task");
break;
}
} }
if should_send { if should_send {

View File

@ -61,7 +61,7 @@ pub struct VoiceGatewayReceivePayload<'a> {
impl<'a> WebSocketEvent for VoiceGatewayReceivePayload<'a> {} impl<'a> WebSocketEvent for VoiceGatewayReceivePayload<'a> {}
/// The modes of encryption available in voice UDP connections; /// The modes of encryption available in voice udp connections;
/// ///
/// Not all encryption modes are implemented; it is generally recommended /// Not all encryption modes are implemented; it is generally recommended
/// to use either [[VoiceEncryptionMode::Xsalsa20Poly1305]] or /// to use either [[VoiceEncryptionMode::Xsalsa20Poly1305]] or

View File

@ -12,7 +12,7 @@ use super::VoiceEncryptionMode;
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
/// The voice gateway's ready event; /// The voice gateway's ready event;
/// ///
/// Gives the user info about the UDP connection IP and port, srrc to use, /// Gives the user info about the udp connection ip and port, srrc to use,
/// available encryption modes and other data. /// available encryption modes and other data.
/// ///
/// Sent in response to an Identify event. /// Sent in response to an Identify event.
@ -23,7 +23,7 @@ pub struct VoiceReady {
pub ssrc: u32, pub ssrc: u32,
pub ip: Ipv4Addr, pub ip: Ipv4Addr,
pub port: u16, pub port: u16,
/// The available encryption modes for the UDP connection /// The available encryption modes for the udp connection
pub modes: Vec<VoiceEncryptionMode>, pub modes: Vec<VoiceEncryptionMode>,
#[serde(default)] #[serde(default)]
pub experiments: Vec<String>, pub experiments: Vec<String>,

View File

@ -34,7 +34,7 @@ pub enum VoiceProtocol {
#[default] #[default]
/// Sending data via UDP, documented and the only protocol chorus supports. /// Sending data via UDP, documented and the only protocol chorus supports.
Udp, Udp,
// Possible value, yet NOT RECOMMENDED, AS CHORUS DOES NOT SUPPORT WEBRTC // Possible value, yet NOT RECOMMENED, AS CHORUS DOES NOT SUPPORT WEBRTC
//Webrtc, //Webrtc,
} }
@ -43,9 +43,9 @@ pub enum VoiceProtocol {
/// ///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#protocol-data-structure> /// See <https://discord-userdoccers.vercel.app/topics/voice-connections#protocol-data-structure>
pub struct SelectProtocolData { pub struct SelectProtocolData {
/// Our external IP we got from IP discovery /// Our external ip we got from ip discovery
pub address: String, pub address: String,
/// Our external UDP port we got from IP discovery /// Our external udp port we got from id discovery
pub port: u16, pub port: u16,
/// The mode of encryption to use /// The mode of encryption to use
pub mode: VoiceEncryptionMode, pub mode: VoiceEncryptionMode,

View File

@ -9,7 +9,7 @@ use crate::types::{Snowflake, WebSocketEvent};
/// Event that tells the server we are speaking; /// Event that tells the server we are speaking;
/// ///
/// Essentially, what allows us to send UDP data and lights up the green circle around your avatar. /// Essentially, what allows us to send udp data and lights up the green circle around your avatar.
/// ///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#speaking-structure> /// See <https://discord-userdoccers.vercel.app/topics/voice-connections#speaking-structure>
#[derive(Debug, Deserialize, Serialize, Clone, Default)] #[derive(Debug, Deserialize, Serialize, Clone, Default)]

View File

@ -26,16 +26,8 @@ impl TungsteniteBackend {
websocket_url: &str, websocket_url: &str,
) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> { ) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> {
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
let certs = rustls_native_certs::load_native_certs(); for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
if let Err(e) = certs {
log::error!("Failed to load platform native certs! {:?}", e);
return Err(VoiceGatewayError::CannotConnect {
error: format!("{:?}", e),
});
}
for cert in certs.unwrap() {
roots.add(&rustls::Certificate(cert.0)).unwrap(); roots.add(&rustls::Certificate(cert.0)).unwrap();
} }
let (websocket_stream, _) = match connect_async_tls_with_config( let (websocket_stream, _) = match connect_async_tls_with_config(

View File

@ -37,7 +37,6 @@ pub struct VoiceGateway {
websocket_send: Arc<Mutex<Sink>>, websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream, websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
} }
impl VoiceGateway { impl VoiceGateway {
@ -90,7 +89,6 @@ impl VoiceGateway {
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
websocket_receive, websocket_receive,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
}; };
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello // Now we can continuously check for messages in a different task, since we aren't going to receive another hello
@ -112,19 +110,11 @@ impl VoiceGateway {
} }
/// The main gateway listener task; /// The main gateway listener task;
///
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) { pub async fn gateway_listen_task(&mut self) {
loop { loop {
let msg; let msg = self.websocket_receive.next().await;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("VGW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling // PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]

View File

@ -97,7 +97,7 @@ impl VoiceGatewayHandle {
/// Closes the websocket connection and stops all gateway tasks; /// Closes the websocket connection and stops all gateway tasks;
/// ///
/// Essentially pulls the plug on the voice gateway, leaving it possible to resume; /// Esentially pulls the plug on the voice gateway, leaving it possible to resume;
pub async fn close(&self) { pub async fn close(&self) {
self.kill_send.send(()).unwrap(); self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap(); self.websocket_send.lock().await.close().await.unwrap();

View File

@ -98,6 +98,11 @@ impl VoiceHeartbeatHandler {
let mut nonce: u64 = starting_nonce; let mut nonce: u64 = starting_nonce;
loop { loop {
if kill_receive.try_recv().is_ok() {
trace!("VGW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged { let timeout = if last_heartbeat_acknowledged {
heartbeat_interval heartbeat_interval
} else { } else {
@ -131,10 +136,6 @@ impl VoiceHeartbeatHandler {
} }
} }
} }
Ok(_) = kill_receive.recv() => {
log::trace!("VGW: Closing heartbeat task");
break;
}
} }
if should_send { if should_send {

View File

@ -4,7 +4,7 @@
use crate::{errors::VoiceGatewayError, types::VoiceGatewayReceivePayload}; use crate::{errors::VoiceGatewayError, types::VoiceGatewayReceivePayload};
/// Represents a message received from the voice websocket connection. /// Represents a messsage received from the voice websocket connection.
/// ///
/// This will be either a [VoiceGatewayReceivePayload], containing voice gateway events, or a [VoiceGatewayError]. /// This will be either a [VoiceGatewayReceivePayload], containing voice gateway events, or a [VoiceGatewayError].
/// ///
@ -14,7 +14,7 @@ pub struct VoiceGatewayMessage(pub String);
impl VoiceGatewayMessage { impl VoiceGatewayMessage {
/// Parses the message as an error; /// Parses the message as an error;
/// Returns the error if successfully parsed, None if the message isn't an error /// Returns the error if succesfully parsed, None if the message isn't an error
pub fn error(&self) -> Option<VoiceGatewayError> { pub fn error(&self) -> Option<VoiceGatewayError> {
// Some error strings have dots on the end, which we don't care about // Some error strings have dots on the end, which we don't care about
let processed_content = self.0.to_lowercase().replace('.', ""); let processed_content = self.0.to_lowercase().replace('.', "");

View File

@ -24,7 +24,7 @@ use crate::{
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE}; use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
/// Handle to a voice UDP connection /// Handle to a voice udp connection
/// ///
/// Can be safely cloned and will still correspond to the same connection. /// Can be safely cloned and will still correspond to the same connection.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -45,7 +45,7 @@ impl UdpHandle {
/// ///
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error. /// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
/// ///
/// If the UDP socket is broken, this returns a [VoiceUdpError::BrokenSocket] error. /// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_opus_data( pub async fn send_opus_data(
&self, &self,
timestamp: u32, timestamp: u32,
@ -151,8 +151,8 @@ impl UdpHandle {
data_lock.last_udp_encryption_nonce = Some(nonce); data_lock.last_udp_encryption_nonce = Some(nonce);
drop(data_lock); drop(data_lock);
// TODO: Is big endian correct? This is not documented anywhere // TODO: Is le correct? This is not documented anywhere
let mut bytes = nonce.to_be_bytes().to_vec(); let mut bytes = nonce.to_le_bytes().to_vec();
// This is 4 bytes, it has to be a different size, appends 0s // This is 4 bytes, it has to be a different size, appends 0s
while bytes.len() < 24 { while bytes.len() < 24 {
@ -212,7 +212,7 @@ impl UdpHandle {
let mut encrypted_payload = encryption_result.unwrap(); let mut encrypted_payload = encryption_result.unwrap();
// Append the nonce bytes, if needed // Append the nonce bytes, if needed
// All other encryption modes have an explicit nonce, whereas Xsalsa20Poly1305 // All other encryption modes have an explicit nonce, where as Xsalsa20Poly1305
// has the nonce as the rtp header. // has the nonce as the rtp header.
if session_description.encryption_mode != VoiceEncryptionMode::Xsalsa20Poly1305 { if session_description.encryption_mode != VoiceEncryptionMode::Xsalsa20Poly1305 {
encrypted_payload.append(&mut nonce_bytes); encrypted_payload.append(&mut nonce_bytes);

View File

@ -42,9 +42,9 @@ pub struct UdpHandler {
} }
impl UdpHandler { impl UdpHandler {
/// Spawns a new UDP handler and performs IP discovery. /// Spawns a new udp handler and performs ip discovery.
/// ///
/// Mutates the given data_reference with the IP discovery data. /// Mutates the given data_reference with the ip discovery data.
pub async fn spawn( pub async fn spawn(
data_reference: Arc<RwLock<VoiceData>>, data_reference: Arc<RwLock<VoiceData>>,
url: SocketAddr, url: SocketAddr,
@ -141,7 +141,7 @@ impl UdpHandler {
/// The main listen task; /// The main listen task;
/// ///
/// Receives UDP messages and parses them. /// Receives udp messages and parses them.
async fn listen_task(&mut self) { async fn listen_task(&mut self) {
loop { loop {
// FIXME: is there a max size for these packets? // FIXME: is there a max size for these packets?
@ -150,7 +150,7 @@ impl UdpHandler {
// Update: see <https://stackoverflow.com/questions/58097580/rtp-packet-maximum-size> // Update: see <https://stackoverflow.com/questions/58097580/rtp-packet-maximum-size>
// > "The RTP standard does not set a maximum size.." // > "The RTP standard does not set a maximum size.."
// //
// The theoretical max for this buffer would be 1458 bytes, but that is imo // The theorhetical max for this buffer would be 1458 bytes, but that is imo
// unreasonable to allocate for every message. // unreasonable to allocate for every message.
let mut buf: Vec<u8> = vec![0; 512]; let mut buf: Vec<u8> = vec![0; 512];

View File

@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this // License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/. // file, You can obtain one at http://mozilla.org/MPL/2.0/.
//! Defines the UDP component of voice communications, sending and receiving raw rtp data. //! Defines the udp component of voice communications, sending and receiving raw rtp data.
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure> /// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure>
/// This always adds up to 12 bytes /// This always adds up to 12 bytes

View File

@ -9,17 +9,17 @@ use crate::types::{SessionDescription, Snowflake, VoiceReady, VoiceServerUpdate}
#[derive(Debug, Default)] #[derive(Debug, Default)]
/// Saves data shared between parts of the voice architecture; /// Saves data shared between parts of the voice architecture;
/// ///
/// Struct used to give the UDP connection data received from the gateway. /// Struct used to give the Udp connection data received from the gateway.
pub struct VoiceData { pub struct VoiceData {
pub server_data: Option<VoiceServerUpdate>, pub server_data: Option<VoiceServerUpdate>,
pub ready_data: Option<VoiceReady>, pub ready_data: Option<VoiceReady>,
pub session_description: Option<SessionDescription>, pub session_description: Option<SessionDescription>,
pub user_id: Snowflake, pub user_id: Snowflake,
pub session_id: String, pub session_id: String,
/// The last sequence number we used, has to be incremented by one every time we send a message /// The last sequence number we used, has to be incremeted by one every time we send a message
pub last_sequence_number: u16, pub last_sequence_number: u16,
pub ip_discovery: Option<IpDiscovery>, pub ip_discovery: Option<IpDiscovery>,
/// The last UDP encryption nonce, if we are using an encryption mode with incremental nonces. /// The last udp encryption nonce, if we are using an encryption mode with incremental nonces.
pub last_udp_encryption_nonce: Option<u32>, pub last_udp_encryption_nonce: Option<u32>,
} }