feat: udp error handling, create udp/backends

This commit is contained in:
kozabrada123 2023-12-29 12:48:22 +01:00
parent 2dadd38604
commit e9ef2444d5
9 changed files with 190 additions and 40 deletions

View File

@ -129,3 +129,23 @@ custom_error! {
} }
impl WebSocketEvent for VoiceGatewayError {} impl WebSocketEvent for VoiceGatewayError {}
custom_error! {
/// Voice UDP errors.
#[derive(Clone, PartialEq, Eq)]
pub VoiceUdpError
// General errors
BrokenSocket{error: String} = "Could not write / read from udp socket: {error}",
NoData = "We have not set received the necessary data to perform this operation.",
// Encryption errors
NoKey = "Tried to encrypt / decrypt rtp data, but no key has been received yet",
FailedEncryption = "Tried to encrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
// Errors when initiating a socket connection
CannotBind{error: String} = "Cannot bind socket due to a udp error: {error}",
CannotConnect{error: String} = "Cannot connect due to a udp error: {error}",
}
impl WebSocketEvent for VoiceUdpError {}

View File

@ -4,7 +4,6 @@
/// Gets an xsalsa20poly1305 nonce from an rtppacket. /// Gets an xsalsa20poly1305 nonce from an rtppacket.
pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> { pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> {
let mut rtp_header = Vec::with_capacity(24); let mut rtp_header = Vec::with_capacity(24);
rtp_header.append(&mut packet[0..12].to_vec()); rtp_header.append(&mut packet[0..12].to_vec());

View File

@ -104,7 +104,8 @@ impl Observer<VoiceReady> for VoiceHandler {
std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)), std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)),
data.ssrc, data.ssrc,
) )
.await; .await
.unwrap();
let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap(); let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap();

View File

@ -0,0 +1,19 @@
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub mod tokio;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub use tokio::*;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub mod wasm;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub use wasm::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub type UdpSocket = tokio::TokioSocket;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub type UdpBackend = tokio::TokioBackend;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub type UdpSocket = wasm::WasmSocket;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub type UdpBackend = wasm::WasmBackend;

View File

@ -0,0 +1,33 @@
use std::net::SocketAddr;
use crate::errors::VoiceUdpError;
#[derive(Debug, Clone)]
pub struct TokioBackend;
pub type TokioSocket = tokio::net::UdpSocket;
impl TokioBackend {
pub async fn connect(url: SocketAddr) -> Result<TokioSocket, VoiceUdpError> {
// Bind with a port number of 0, so the os assigns this listener a port
let udp_socket_result = TokioSocket::bind("0.0.0.0:0").await;
if let Err(e) = udp_socket_result {
return Err(VoiceUdpError::CannotBind {
error: format!("{:?}", e),
});
}
let udp_socket = udp_socket_result.unwrap();
let connection_result = udp_socket.connect(url).await;
if let Err(e) = connection_result {
return Err(VoiceUdpError::CannotConnect {
error: format!("{:?}", e),
});
}
Ok(udp_socket)
}
}

View File

@ -0,0 +1,13 @@
use std::net::SocketAddr;
// TODO: Add wasm websockets
compile_error!("Udp voice support is not implemented yet for wasm.");
#[derive(Debug, Clone)]
pub struct WasmBackend;
pub type WasmSocket;
impl WasmBackend {
pub async fn connect(url: SocketAddr) -> Result<WasmSocket, VoiceUdpError> {}
}

View File

@ -7,9 +7,14 @@ use discortp::Packet;
use log::*; use log::*;
use tokio::{net::UdpSocket, sync::Mutex, sync::RwLock}; use tokio::{sync::Mutex, sync::RwLock};
use crate::voice::{crypto, voice_data::VoiceData}; use super::UdpSocket;
use crate::{
errors::VoiceUdpError,
voice::{crypto, voice_data::VoiceData},
};
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE}; use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
@ -27,8 +32,25 @@ impl UdpHandle {
/// Constructs and sends encoded opus rtp data. /// Constructs and sends encoded opus rtp data.
/// ///
/// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it. /// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it.
pub async fn send_opus_data(&self, timestamp: u32, payload: Vec<u8>) { ///
let ssrc = self.data.read().await.ready_data.clone().unwrap().ssrc; /// # Errors
/// If we do not have VoiceReady data, which contains our ssrc, this returns a
/// [VoiceUdpError::NoData] error.
///
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_opus_data(
&self,
timestamp: u32,
payload: Vec<u8>,
) -> Result<(), VoiceUdpError> {
let voice_ready_data_result = self.data.read().await.ready_data.clone();
if let None = voice_ready_data_result {
return Err(VoiceUdpError::NoData);
}
let ssrc = voice_ready_data_result.unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1); let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number; self.data.write().await.last_sequence_number = sequence_number;
@ -54,34 +76,46 @@ impl UdpHandle {
let mut buffer = vec![0; buffer_size]; let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap(); let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).expect("Mangled rtp packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
rtp_packet.populate(&rtp_data); rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await; self.send_rtp_packet(rtp_packet).await
} }
/// Encrypts and sends and rtp packet. /// Encrypts and sends and rtp packet.
pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) { ///
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await; /// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_rtp_packet(
&self,
packet: discortp::rtp::MutableRtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await?;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap(); let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable()) self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await; .await?;
Ok(())
} }
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an /// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload /// encrypted payload
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
pub async fn encrypt_rtp_packet_payload( pub async fn encrypt_rtp_packet_payload(
&self, &self,
packet: &discortp::rtp::MutableRtpPacket<'_>, packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Vec<u8> { ) -> Result<Vec<u8>, VoiceUdpError> {
let payload = packet.payload(); let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone(); let session_description_result = self.data.read().await.session_description.clone();
// We are trying to encrypt, but have not received SessionDescription yet,
// which contains the secret key.
if session_description_result.is_none() { if session_description_result.is_none() {
// FIXME: Make this function reutrn a result with a proper error type for these kinds return Err(VoiceUdpError::NoKey);
// of functions
panic!("Trying to encrypt packet but no key provided yet");
} }
let session_description = session_description_result.unwrap(); let session_description = session_description_result.unwrap();
@ -96,8 +130,11 @@ impl UdpHandle {
let encryption_result = encryptor.encrypt(nonce, payload); let encryption_result = encryptor.encrypt(nonce, payload);
if encryption_result.is_err() { if encryption_result.is_err() {
// FIXME: See above fixme // Safety: If encryption errors here, it's chorus' fault, and it makes no sense to
panic!("Encryption error"); // return the error to the user.
//
// This is not an error the user should account for, which is why we throw it here.
panic!("{}", VoiceUdpError::FailedEncryption);
} }
let mut encrypted_payload = encryption_result.unwrap(); let mut encrypted_payload = encryption_result.unwrap();
@ -113,15 +150,28 @@ impl UdpHandle {
new_buffer.append(&mut rtp_header); new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload); new_buffer.append(&mut encrypted_payload);
new_buffer Ok(new_buffer)
} }
/// Sends an (already encrypted) rtp packet to the connection. /// Sends an (already encrypted) rtp packet to the connection.
pub async fn send_encrypted_rtp_packet(&self, packet: discortp::rtp::RtpPacket<'_>) { ///
/// # Errors
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_encrypted_rtp_packet(
&self,
packet: discortp::rtp::RtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let raw_bytes = packet.packet(); let raw_bytes = packet.packet();
self.socket.send(raw_bytes).await.unwrap(); let send_res = self.socket.send(raw_bytes).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
debug!("VUDP: Sent rtp packet!"); debug!("VUDP: Sent rtp packet!");
Ok(())
} }
} }

View File

@ -12,13 +12,14 @@ use discortp::discord::{
use discortp::rtcp::report::ReceiverReport; use discortp::rtcp::report::ReceiverReport;
use discortp::rtcp::report::SenderReport; use discortp::rtcp::report::SenderReport;
use discortp::{demux::demux, Packet}; use discortp::{demux::demux, Packet};
use tokio::{ use tokio::sync::{Mutex, RwLock};
net::UdpSocket,
sync::{Mutex, RwLock}, use super::UdpBackend;
}; use super::UdpSocket;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use super::RTP_HEADER_SIZE; use super::RTP_HEADER_SIZE;
use crate::errors::VoiceUdpError;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use crate::voice::voice_data::VoiceData; use crate::voice::voice_data::VoiceData;
use super::{events::VoiceUDPEvents, UdpHandle}; use super::{events::VoiceUDPEvents, UdpHandle};
@ -41,11 +42,8 @@ impl UdpHandler {
data_reference: Arc<RwLock<VoiceData>>, data_reference: Arc<RwLock<VoiceData>>,
url: SocketAddr, url: SocketAddr,
ssrc: u32, ssrc: u32,
) -> UdpHandle { ) -> Result<UdpHandle, VoiceUdpError> {
// Bind with a port number of 0, so the os assigns this listener a port let udp_socket = UdpBackend::connect(url).await?;
let udp_socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
udp_socket.connect(url).await.unwrap();
// First perform ip discovery // First perform ip discovery
let ip_discovery = IpDiscovery { let ip_discovery = IpDiscovery {
@ -57,14 +55,15 @@ impl UdpHandler {
payload: Vec::new(), payload: Vec::new(),
}; };
// Minimum size with an empty Address value, + 64 bytes for the actual address size
let size = IpDiscoveryPacket::minimum_packet_size() + 64; let size = IpDiscoveryPacket::minimum_packet_size() + 64;
let mut buf: Vec<u8> = vec![0; size]; let mut buf: Vec<u8> = vec![0; size];
// TODO: Make this not panic everything // Safety: expect is justified here, since this is an error which should never happen.
// Actually, if this panics, something is very, very wrong // If this errors, the code at fault is the buffer size calculation.
let mut ip_discovery_packet = let mut ip_discovery_packet =
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet"); MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
ip_discovery_packet.populate(&ip_discovery); ip_discovery_packet.populate(&ip_discovery);
@ -72,20 +71,34 @@ impl UdpHandler {
info!("VUDP: Sending Ip Discovery {:?}", &data); info!("VUDP: Sending Ip Discovery {:?}", &data);
udp_socket.send(data).await.unwrap(); let send_res = udp_socket.send(data).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
info!("VUDP: Sent packet discovery request"); info!("VUDP: Sent packet discovery request");
// Handle the ip discovery response // Handle the ip discovery response
let receieved_size = udp_socket.recv(&mut buf).await.unwrap(); let received_size_or_err = udp_socket.recv(&mut buf).await;
if let Err(e) = received_size_or_err {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
let received_size = received_size_or_err.unwrap();
info!( info!(
"VUDP: Receiving messsage: {:?} - (expected {} vs real {})", "VUDP: Receiving messsage: {:?} - (expected {} vs real {})",
buf.clone(), buf.clone(),
size, size,
receieved_size received_size
); );
let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).unwrap(); let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).expect("Could not make ipdiscovery packet from received data, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
info!( info!(
"VUDP: Received ip discovery!!! {:?}", "VUDP: Received ip discovery!!! {:?}",
@ -121,11 +134,11 @@ impl UdpHandler {
handler.listen_task().await; handler.listen_task().await;
}); });
UdpHandle { Ok(UdpHandle {
events: shared_events, events: shared_events,
socket, socket,
data: data_reference, data: data_reference,
} })
} }
/// The main listen task; /// The main listen task;

View File

@ -4,9 +4,11 @@
/// This always adds up to 12 bytes /// This always adds up to 12 bytes
const RTP_HEADER_SIZE: u8 = 12; const RTP_HEADER_SIZE: u8 = 12;
pub mod handle; pub mod backends;
pub mod events; pub mod events;
pub mod handle;
pub mod handler; pub mod handler;
pub use backends::*;
pub use handle::*; pub use handle::*;
pub use handler::*; pub use handler::*;