From 91a27355a9599fa8046324562aa819afbb293d81 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Fri, 29 Dec 2023 12:48:22 +0100 Subject: [PATCH] feat: udp error handling, create udp/backends --- src/errors.rs | 20 ++++++++ src/voice/crypto.rs | 1 - src/voice/handler.rs | 3 +- src/voice/udp/backends/mod.rs | 19 ++++++++ src/voice/udp/backends/tokio.rs | 33 +++++++++++++ src/voice/udp/backends/wasm.rs | 13 +++++ src/voice/udp/handle.rs | 86 ++++++++++++++++++++++++++------- src/voice/udp/handler.rs | 51 +++++++++++-------- src/voice/udp/mod.rs | 4 +- 9 files changed, 190 insertions(+), 40 deletions(-) create mode 100644 src/voice/udp/backends/mod.rs create mode 100644 src/voice/udp/backends/tokio.rs create mode 100644 src/voice/udp/backends/wasm.rs diff --git a/src/errors.rs b/src/errors.rs index 15c5b44..bf3727c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -129,3 +129,23 @@ custom_error! { } impl WebSocketEvent for VoiceGatewayError {} + +custom_error! { + /// Voice UDP errors. + #[derive(Clone, PartialEq, Eq)] + pub VoiceUdpError + + // General errors + BrokenSocket{error: String} = "Could not write / read from udp socket: {error}", + NoData = "We have not set received the necessary data to perform this operation.", + + // Encryption errors + NoKey = "Tried to encrypt / decrypt rtp data, but no key has been received yet", + FailedEncryption = "Tried to encrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new", + + // Errors when initiating a socket connection + CannotBind{error: String} = "Cannot bind socket due to a udp error: {error}", + CannotConnect{error: String} = "Cannot connect due to a udp error: {error}", +} + +impl WebSocketEvent for VoiceUdpError {} diff --git a/src/voice/crypto.rs b/src/voice/crypto.rs index 172abf3..7bcc056 100644 --- a/src/voice/crypto.rs +++ b/src/voice/crypto.rs @@ -4,7 +4,6 @@ /// Gets an xsalsa20poly1305 nonce from an rtppacket. pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec { - let mut rtp_header = Vec::with_capacity(24); rtp_header.append(&mut packet[0..12].to_vec()); diff --git a/src/voice/handler.rs b/src/voice/handler.rs index c8ddd8c..aa3abcb 100644 --- a/src/voice/handler.rs +++ b/src/voice/handler.rs @@ -104,7 +104,8 @@ impl Observer for VoiceHandler { std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)), data.ssrc, ) - .await; + .await + .unwrap(); let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap(); diff --git a/src/voice/udp/backends/mod.rs b/src/voice/udp/backends/mod.rs new file mode 100644 index 0000000..521d085 --- /dev/null +++ b/src/voice/udp/backends/mod.rs @@ -0,0 +1,19 @@ +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub mod tokio; +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub use tokio::*; + +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub mod wasm; +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub use wasm::*; + +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub type UdpSocket = tokio::TokioSocket; +#[cfg(all(not(target_arch = "wasm32"), feature = "client"))] +pub type UdpBackend = tokio::TokioBackend; + +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub type UdpSocket = wasm::WasmSocket; +#[cfg(all(target_arch = "wasm32", feature = "client"))] +pub type UdpBackend = wasm::WasmBackend; diff --git a/src/voice/udp/backends/tokio.rs b/src/voice/udp/backends/tokio.rs new file mode 100644 index 0000000..eec1386 --- /dev/null +++ b/src/voice/udp/backends/tokio.rs @@ -0,0 +1,33 @@ +use std::net::SocketAddr; + +use crate::errors::VoiceUdpError; + +#[derive(Debug, Clone)] +pub struct TokioBackend; + +pub type TokioSocket = tokio::net::UdpSocket; + +impl TokioBackend { + pub async fn connect(url: SocketAddr) -> Result { + // Bind with a port number of 0, so the os assigns this listener a port + let udp_socket_result = TokioSocket::bind("0.0.0.0:0").await; + + if let Err(e) = udp_socket_result { + return Err(VoiceUdpError::CannotBind { + error: format!("{:?}", e), + }); + } + + let udp_socket = udp_socket_result.unwrap(); + + let connection_result = udp_socket.connect(url).await; + + if let Err(e) = connection_result { + return Err(VoiceUdpError::CannotConnect { + error: format!("{:?}", e), + }); + } + + Ok(udp_socket) + } +} diff --git a/src/voice/udp/backends/wasm.rs b/src/voice/udp/backends/wasm.rs new file mode 100644 index 0000000..502987b --- /dev/null +++ b/src/voice/udp/backends/wasm.rs @@ -0,0 +1,13 @@ +use std::net::SocketAddr; + +// TODO: Add wasm websockets +compile_error!("Udp voice support is not implemented yet for wasm."); + +#[derive(Debug, Clone)] +pub struct WasmBackend; + +pub type WasmSocket; + +impl WasmBackend { + pub async fn connect(url: SocketAddr) -> Result {} +} diff --git a/src/voice/udp/handle.rs b/src/voice/udp/handle.rs index f402ef0..e597f11 100644 --- a/src/voice/udp/handle.rs +++ b/src/voice/udp/handle.rs @@ -7,9 +7,14 @@ use discortp::Packet; use log::*; -use tokio::{net::UdpSocket, sync::Mutex, sync::RwLock}; +use tokio::{sync::Mutex, sync::RwLock}; -use crate::voice::{crypto, voice_data::VoiceData}; +use super::UdpSocket; + +use crate::{ + errors::VoiceUdpError, + voice::{crypto, voice_data::VoiceData}, +}; use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE}; @@ -27,8 +32,25 @@ impl UdpHandle { /// Constructs and sends encoded opus rtp data. /// /// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it. - pub async fn send_opus_data(&self, timestamp: u32, payload: Vec) { - let ssrc = self.data.read().await.ready_data.clone().unwrap().ssrc; + /// + /// # Errors + /// If we do not have VoiceReady data, which contains our ssrc, this returns a + /// [VoiceUdpError::NoData] error. + /// + /// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error. + /// + /// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error. + pub async fn send_opus_data( + &self, + timestamp: u32, + payload: Vec, + ) -> Result<(), VoiceUdpError> { + let voice_ready_data_result = self.data.read().await.ready_data.clone(); + if let None = voice_ready_data_result { + return Err(VoiceUdpError::NoData); + } + + let ssrc = voice_ready_data_result.unwrap().ssrc; let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1); self.data.write().await.last_sequence_number = sequence_number; @@ -54,34 +76,46 @@ impl UdpHandle { let mut buffer = vec![0; buffer_size]; - let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap(); + let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).expect("Mangled rtp packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new"); rtp_packet.populate(&rtp_data); - self.send_rtp_packet(rtp_packet).await; + self.send_rtp_packet(rtp_packet).await } /// Encrypts and sends and rtp packet. - pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) { - let mut buffer = self.encrypt_rtp_packet_payload(&packet).await; + /// + /// # Errors + /// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error. + /// + /// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error. + pub async fn send_rtp_packet( + &self, + packet: discortp::rtp::MutableRtpPacket<'_>, + ) -> Result<(), VoiceUdpError> { + let mut buffer = self.encrypt_rtp_packet_payload(&packet).await?; let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap(); self.send_encrypted_rtp_packet(new_packet.consume_to_immutable()) - .await; + .await?; + Ok(()) } /// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an /// encrypted payload + /// + /// # Errors + /// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error. pub async fn encrypt_rtp_packet_payload( &self, packet: &discortp::rtp::MutableRtpPacket<'_>, - ) -> Vec { + ) -> Result, VoiceUdpError> { let payload = packet.payload(); let session_description_result = self.data.read().await.session_description.clone(); + // We are trying to encrypt, but have not received SessionDescription yet, + // which contains the secret key. if session_description_result.is_none() { - // FIXME: Make this function reutrn a result with a proper error type for these kinds - // of functions - panic!("Trying to encrypt packet but no key provided yet"); + return Err(VoiceUdpError::NoKey); } let session_description = session_description_result.unwrap(); @@ -96,8 +130,11 @@ impl UdpHandle { let encryption_result = encryptor.encrypt(nonce, payload); if encryption_result.is_err() { - // FIXME: See above fixme - panic!("Encryption error"); + // Safety: If encryption errors here, it's chorus' fault, and it makes no sense to + // return the error to the user. + // + // This is not an error the user should account for, which is why we throw it here. + panic!("{}", VoiceUdpError::FailedEncryption); } let mut encrypted_payload = encryption_result.unwrap(); @@ -113,15 +150,28 @@ impl UdpHandle { new_buffer.append(&mut rtp_header); new_buffer.append(&mut encrypted_payload); - new_buffer + Ok(new_buffer) } /// Sends an (already encrypted) rtp packet to the connection. - pub async fn send_encrypted_rtp_packet(&self, packet: discortp::rtp::RtpPacket<'_>) { + /// + /// # Errors + /// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error. + pub async fn send_encrypted_rtp_packet( + &self, + packet: discortp::rtp::RtpPacket<'_>, + ) -> Result<(), VoiceUdpError> { let raw_bytes = packet.packet(); - self.socket.send(raw_bytes).await.unwrap(); + let send_res = self.socket.send(raw_bytes).await; + if let Err(e) = send_res { + return Err(VoiceUdpError::BrokenSocket { + error: format!("{:?}", e), + }); + } debug!("VUDP: Sent rtp packet!"); + + Ok(()) } } diff --git a/src/voice/udp/handler.rs b/src/voice/udp/handler.rs index af9f4c3..a7b05b2 100644 --- a/src/voice/udp/handler.rs +++ b/src/voice/udp/handler.rs @@ -12,13 +12,14 @@ use discortp::discord::{ use discortp::rtcp::report::ReceiverReport; use discortp::rtcp::report::SenderReport; use discortp::{demux::demux, Packet}; -use tokio::{ - net::UdpSocket, - sync::{Mutex, RwLock}, -}; +use tokio::sync::{Mutex, RwLock}; + +use super::UdpBackend; +use super::UdpSocket; -use crate::voice::crypto::get_xsalsa20_poly1305_nonce; use super::RTP_HEADER_SIZE; +use crate::errors::VoiceUdpError; +use crate::voice::crypto::get_xsalsa20_poly1305_nonce; use crate::voice::voice_data::VoiceData; use super::{events::VoiceUDPEvents, UdpHandle}; @@ -41,11 +42,8 @@ impl UdpHandler { data_reference: Arc>, url: SocketAddr, ssrc: u32, - ) -> UdpHandle { - // Bind with a port number of 0, so the os assigns this listener a port - let udp_socket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); - - udp_socket.connect(url).await.unwrap(); + ) -> Result { + let udp_socket = UdpBackend::connect(url).await?; // First perform ip discovery let ip_discovery = IpDiscovery { @@ -57,14 +55,15 @@ impl UdpHandler { payload: Vec::new(), }; + // Minimum size with an empty Address value, + 64 bytes for the actual address size let size = IpDiscoveryPacket::minimum_packet_size() + 64; let mut buf: Vec = vec![0; size]; - // TODO: Make this not panic everything - // Actually, if this panics, something is very, very wrong + // Safety: expect is justified here, since this is an error which should never happen. + // If this errors, the code at fault is the buffer size calculation. let mut ip_discovery_packet = - MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet"); + MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new"); ip_discovery_packet.populate(&ip_discovery); @@ -72,20 +71,34 @@ impl UdpHandler { info!("VUDP: Sending Ip Discovery {:?}", &data); - udp_socket.send(data).await.unwrap(); + let send_res = udp_socket.send(data).await; + if let Err(e) = send_res { + return Err(VoiceUdpError::BrokenSocket { + error: format!("{:?}", e), + }); + } info!("VUDP: Sent packet discovery request"); // Handle the ip discovery response - let receieved_size = udp_socket.recv(&mut buf).await.unwrap(); + let received_size_or_err = udp_socket.recv(&mut buf).await; + + if let Err(e) = received_size_or_err { + return Err(VoiceUdpError::BrokenSocket { + error: format!("{:?}", e), + }); + } + + let received_size = received_size_or_err.unwrap(); + info!( "VUDP: Receiving messsage: {:?} - (expected {} vs real {})", buf.clone(), size, - receieved_size + received_size ); - let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).unwrap(); + let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).expect("Could not make ipdiscovery packet from received data, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new"); info!( "VUDP: Received ip discovery!!! {:?}", @@ -121,11 +134,11 @@ impl UdpHandler { handler.listen_task().await; }); - UdpHandle { + Ok(UdpHandle { events: shared_events, socket, data: data_reference, - } + }) } /// The main listen task; diff --git a/src/voice/udp/mod.rs b/src/voice/udp/mod.rs index 3d0b276..37f021c 100644 --- a/src/voice/udp/mod.rs +++ b/src/voice/udp/mod.rs @@ -4,9 +4,11 @@ /// This always adds up to 12 bytes const RTP_HEADER_SIZE: u8 = 12; -pub mod handle; +pub mod backends; pub mod events; +pub mod handle; pub mod handler; +pub use backends::*; pub use handle::*; pub use handler::*;