feat: udp error handling, create udp/backends

This commit is contained in:
kozabrada123 2023-12-29 12:48:22 +01:00
parent ad4db5794b
commit 91a27355a9
9 changed files with 190 additions and 40 deletions

View File

@ -129,3 +129,23 @@ custom_error! {
}
impl WebSocketEvent for VoiceGatewayError {}
custom_error! {
/// Voice UDP errors.
#[derive(Clone, PartialEq, Eq)]
pub VoiceUdpError
// General errors
BrokenSocket{error: String} = "Could not write / read from udp socket: {error}",
NoData = "We have not set received the necessary data to perform this operation.",
// Encryption errors
NoKey = "Tried to encrypt / decrypt rtp data, but no key has been received yet",
FailedEncryption = "Tried to encrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
// Errors when initiating a socket connection
CannotBind{error: String} = "Cannot bind socket due to a udp error: {error}",
CannotConnect{error: String} = "Cannot connect due to a udp error: {error}",
}
impl WebSocketEvent for VoiceUdpError {}

View File

@ -4,7 +4,6 @@
/// Gets an xsalsa20poly1305 nonce from an rtppacket.
pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> {
let mut rtp_header = Vec::with_capacity(24);
rtp_header.append(&mut packet[0..12].to_vec());

View File

@ -104,7 +104,8 @@ impl Observer<VoiceReady> for VoiceHandler {
std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)),
data.ssrc,
)
.await;
.await
.unwrap();
let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap();

View File

@ -0,0 +1,19 @@
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub mod tokio;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub use tokio::*;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub mod wasm;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub use wasm::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub type UdpSocket = tokio::TokioSocket;
#[cfg(all(not(target_arch = "wasm32"), feature = "client"))]
pub type UdpBackend = tokio::TokioBackend;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub type UdpSocket = wasm::WasmSocket;
#[cfg(all(target_arch = "wasm32", feature = "client"))]
pub type UdpBackend = wasm::WasmBackend;

View File

@ -0,0 +1,33 @@
use std::net::SocketAddr;
use crate::errors::VoiceUdpError;
#[derive(Debug, Clone)]
pub struct TokioBackend;
pub type TokioSocket = tokio::net::UdpSocket;
impl TokioBackend {
pub async fn connect(url: SocketAddr) -> Result<TokioSocket, VoiceUdpError> {
// Bind with a port number of 0, so the os assigns this listener a port
let udp_socket_result = TokioSocket::bind("0.0.0.0:0").await;
if let Err(e) = udp_socket_result {
return Err(VoiceUdpError::CannotBind {
error: format!("{:?}", e),
});
}
let udp_socket = udp_socket_result.unwrap();
let connection_result = udp_socket.connect(url).await;
if let Err(e) = connection_result {
return Err(VoiceUdpError::CannotConnect {
error: format!("{:?}", e),
});
}
Ok(udp_socket)
}
}

View File

@ -0,0 +1,13 @@
use std::net::SocketAddr;
// TODO: Add wasm websockets
compile_error!("Udp voice support is not implemented yet for wasm.");
#[derive(Debug, Clone)]
pub struct WasmBackend;
pub type WasmSocket;
impl WasmBackend {
pub async fn connect(url: SocketAddr) -> Result<WasmSocket, VoiceUdpError> {}
}

View File

@ -7,9 +7,14 @@ use discortp::Packet;
use log::*;
use tokio::{net::UdpSocket, sync::Mutex, sync::RwLock};
use tokio::{sync::Mutex, sync::RwLock};
use crate::voice::{crypto, voice_data::VoiceData};
use super::UdpSocket;
use crate::{
errors::VoiceUdpError,
voice::{crypto, voice_data::VoiceData},
};
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
@ -27,8 +32,25 @@ impl UdpHandle {
/// Constructs and sends encoded opus rtp data.
///
/// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it.
pub async fn send_opus_data(&self, timestamp: u32, payload: Vec<u8>) {
let ssrc = self.data.read().await.ready_data.clone().unwrap().ssrc;
///
/// # Errors
/// If we do not have VoiceReady data, which contains our ssrc, this returns a
/// [VoiceUdpError::NoData] error.
///
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_opus_data(
&self,
timestamp: u32,
payload: Vec<u8>,
) -> Result<(), VoiceUdpError> {
let voice_ready_data_result = self.data.read().await.ready_data.clone();
if let None = voice_ready_data_result {
return Err(VoiceUdpError::NoData);
}
let ssrc = voice_ready_data_result.unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number;
@ -54,34 +76,46 @@ impl UdpHandle {
let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).expect("Mangled rtp packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await;
self.send_rtp_packet(rtp_packet).await
}
/// Encrypts and sends and rtp packet.
pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await;
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_rtp_packet(
&self,
packet: discortp::rtp::MutableRtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await?;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await;
.await?;
Ok(())
}
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
pub async fn encrypt_rtp_packet_payload(
&self,
packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Vec<u8> {
) -> Result<Vec<u8>, VoiceUdpError> {
let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone();
// We are trying to encrypt, but have not received SessionDescription yet,
// which contains the secret key.
if session_description_result.is_none() {
// FIXME: Make this function reutrn a result with a proper error type for these kinds
// of functions
panic!("Trying to encrypt packet but no key provided yet");
return Err(VoiceUdpError::NoKey);
}
let session_description = session_description_result.unwrap();
@ -96,8 +130,11 @@ impl UdpHandle {
let encryption_result = encryptor.encrypt(nonce, payload);
if encryption_result.is_err() {
// FIXME: See above fixme
panic!("Encryption error");
// Safety: If encryption errors here, it's chorus' fault, and it makes no sense to
// return the error to the user.
//
// This is not an error the user should account for, which is why we throw it here.
panic!("{}", VoiceUdpError::FailedEncryption);
}
let mut encrypted_payload = encryption_result.unwrap();
@ -113,15 +150,28 @@ impl UdpHandle {
new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload);
new_buffer
Ok(new_buffer)
}
/// Sends an (already encrypted) rtp packet to the connection.
pub async fn send_encrypted_rtp_packet(&self, packet: discortp::rtp::RtpPacket<'_>) {
///
/// # Errors
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_encrypted_rtp_packet(
&self,
packet: discortp::rtp::RtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let raw_bytes = packet.packet();
self.socket.send(raw_bytes).await.unwrap();
let send_res = self.socket.send(raw_bytes).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
debug!("VUDP: Sent rtp packet!");
Ok(())
}
}

View File

@ -12,13 +12,14 @@ use discortp::discord::{
use discortp::rtcp::report::ReceiverReport;
use discortp::rtcp::report::SenderReport;
use discortp::{demux::demux, Packet};
use tokio::{
net::UdpSocket,
sync::{Mutex, RwLock},
};
use tokio::sync::{Mutex, RwLock};
use super::UdpBackend;
use super::UdpSocket;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use super::RTP_HEADER_SIZE;
use crate::errors::VoiceUdpError;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use crate::voice::voice_data::VoiceData;
use super::{events::VoiceUDPEvents, UdpHandle};
@ -41,11 +42,8 @@ impl UdpHandler {
data_reference: Arc<RwLock<VoiceData>>,
url: SocketAddr,
ssrc: u32,
) -> UdpHandle {
// Bind with a port number of 0, so the os assigns this listener a port
let udp_socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
udp_socket.connect(url).await.unwrap();
) -> Result<UdpHandle, VoiceUdpError> {
let udp_socket = UdpBackend::connect(url).await?;
// First perform ip discovery
let ip_discovery = IpDiscovery {
@ -57,14 +55,15 @@ impl UdpHandler {
payload: Vec::new(),
};
// Minimum size with an empty Address value, + 64 bytes for the actual address size
let size = IpDiscoveryPacket::minimum_packet_size() + 64;
let mut buf: Vec<u8> = vec![0; size];
// TODO: Make this not panic everything
// Actually, if this panics, something is very, very wrong
// Safety: expect is justified here, since this is an error which should never happen.
// If this errors, the code at fault is the buffer size calculation.
let mut ip_discovery_packet =
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet");
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
ip_discovery_packet.populate(&ip_discovery);
@ -72,20 +71,34 @@ impl UdpHandler {
info!("VUDP: Sending Ip Discovery {:?}", &data);
udp_socket.send(data).await.unwrap();
let send_res = udp_socket.send(data).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
info!("VUDP: Sent packet discovery request");
// Handle the ip discovery response
let receieved_size = udp_socket.recv(&mut buf).await.unwrap();
let received_size_or_err = udp_socket.recv(&mut buf).await;
if let Err(e) = received_size_or_err {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
let received_size = received_size_or_err.unwrap();
info!(
"VUDP: Receiving messsage: {:?} - (expected {} vs real {})",
buf.clone(),
size,
receieved_size
received_size
);
let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).unwrap();
let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).expect("Could not make ipdiscovery packet from received data, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
info!(
"VUDP: Received ip discovery!!! {:?}",
@ -121,11 +134,11 @@ impl UdpHandler {
handler.listen_task().await;
});
UdpHandle {
Ok(UdpHandle {
events: shared_events,
socket,
data: data_reference,
}
})
}
/// The main listen task;

View File

@ -4,9 +4,11 @@
/// This always adds up to 12 bytes
const RTP_HEADER_SIZE: u8 = 12;
pub mod handle;
pub mod backends;
pub mod events;
pub mod handle;
pub mod handler;
pub use backends::*;
pub use handle::*;
pub use handler::*;