chore: split voice udp

This commit is contained in:
kozabrada123 2023-12-29 11:33:14 +01:00
parent 9039e216be
commit 8413b66e22
5 changed files with 185 additions and 168 deletions

View File

@ -4,10 +4,11 @@
/// Gets an xsalsa20poly1305 nonce from an rtppacket. /// Gets an xsalsa20poly1305 nonce from an rtppacket.
pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> { pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> {
let mut rtp_header = packet[0..12].to_vec();
let mut rtp_header = Vec::with_capacity(24);
rtp_header.append(&mut packet[0..12].to_vec());
// The header is only 12 bytes, but the nonce has to be 24 // The header is only 12 bytes, but the nonce has to be 24
// This actually works mind you, and anything else doesn't
for _i in 0..12 { for _i in 0..12 {
rtp_header.push(0); rtp_header.push(0);
} }

21
src/voice/udp/events.rs Normal file
View File

@ -0,0 +1,21 @@
use discortp::{rtcp::Rtcp, rtp::Rtp};
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
impl WebSocketEvent for Rtp {}
impl WebSocketEvent for Rtcp {}
#[derive(Debug)]
pub struct VoiceUDPEvents {
pub rtp: GatewayEvent<Rtp>,
pub rtcp: GatewayEvent<Rtcp>,
}
impl Default for VoiceUDPEvents {
fn default() -> Self {
Self {
rtp: GatewayEvent::new(),
rtcp: GatewayEvent::new(),
}
}
}

127
src/voice/udp/handle.rs Normal file
View File

@ -0,0 +1,127 @@
use std::sync::Arc;
use crypto_secretbox::{
aead::Aead, cipher::generic_array::GenericArray, KeyInit, XSalsa20Poly1305,
};
use discortp::Packet;
use log::*;
use tokio::{net::UdpSocket, sync::Mutex, sync::RwLock};
use crate::voice::{crypto, voice_data::VoiceData};
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
/// Handle to a voice udp connection
///
/// Can be safely cloned and will still correspond to the same connection.
#[derive(Debug, Clone)]
pub struct UdpHandle {
pub events: Arc<Mutex<VoiceUDPEvents>>,
pub(super) socket: Arc<UdpSocket>,
pub data: Arc<RwLock<VoiceData>>,
}
impl UdpHandle {
/// Constructs and sends encoded opus rtp data.
///
/// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it.
pub async fn send_opus_data(&self, timestamp: u32, payload: Vec<u8>) {
let ssrc = self.data.read().await.ready_data.clone().unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number;
let payload_len = payload.len();
let rtp_data = discortp::rtp::Rtp {
// Always the same
version: 2,
padding: 0,
extension: 0,
csrc_count: 0,
csrc_list: Vec::new(),
marker: 0,
payload_type: discortp::rtp::RtpType::Dynamic(120),
// Actually variable
sequence: sequence_number.into(),
timestamp: timestamp.into(),
ssrc,
payload,
};
let buffer_size = payload_len + RTP_HEADER_SIZE as usize;
let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await;
}
/// Encrypts and sends and rtp packet.
pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await;
}
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload
pub async fn encrypt_rtp_packet_payload(
&self,
packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Vec<u8> {
let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone();
if session_description_result.is_none() {
// FIXME: Make this function reutrn a result with a proper error type for these kinds
// of functions
panic!("Trying to encrypt packet but no key provided yet");
}
let session_description = session_description_result.unwrap();
let nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(packet.packet());
let nonce = GenericArray::from_slice(&nonce_bytes);
let key = GenericArray::from_slice(&session_description.secret_key);
let encryptor = XSalsa20Poly1305::new(key);
let encryption_result = encryptor.encrypt(nonce, payload);
if encryption_result.is_err() {
// FIXME: See above fixme
panic!("Encryption error");
}
let mut encrypted_payload = encryption_result.unwrap();
// We need to allocate a new buffer, since the old one is too small for our new encrypted
// data
let buffer_size = encrypted_payload.len() + RTP_HEADER_SIZE as usize;
let mut new_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
let mut rtp_header = packet.packet().to_vec()[0..RTP_HEADER_SIZE as usize].to_vec();
new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload);
new_buffer
}
/// Sends an (already encrypted) rtp packet to the connection.
pub async fn send_encrypted_rtp_packet(&self, packet: discortp::rtp::RtpPacket<'_>) {
let raw_bytes = packet.packet();
self.socket.send(raw_bytes).await.unwrap();
debug!("VUDP: Sent rtp packet!");
}
}

View File

@ -1,148 +1,32 @@
//! Defines voice raw udp socket handling
use self::voice_udp_events::VoiceUDPEvents;
use super::crypto;
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use log::{debug, info, trace, warn}; use crypto_secretbox::aead::Aead;
use crypto_secretbox::cipher::generic_array::GenericArray;
use crypto_secretbox::KeyInit;
use crypto_secretbox::XSalsa20Poly1305;
use discortp::demux::Demuxed;
use discortp::discord::{
IpDiscovery, IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket,
};
use discortp::rtcp::report::ReceiverReport;
use discortp::rtcp::report::SenderReport;
use discortp::{demux::demux, Packet};
use tokio::{ use tokio::{
net::UdpSocket, net::UdpSocket,
sync::{Mutex, RwLock}, sync::{Mutex, RwLock},
}; };
use crypto_secretbox::{ use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
aead::Aead, cipher::generic_array::GenericArray, KeyInit, XSalsa20Poly1305, use super::RTP_HEADER_SIZE;
}; use crate::voice::voice_data::VoiceData;
use discortp::{ use super::{events::VoiceUDPEvents, UdpHandle};
demux::{demux, Demuxed},
discord::{IpDiscovery, IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket},
rtcp::report::{ReceiverReport, SenderReport},
Packet,
};
use super::voice_data::VoiceData; use log::*;
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure>
/// This always adds up to 12
const RTP_HEADER_SIZE: u8 = 12;
/// Handle to a voice udp connection
///
/// Can be safely cloned and will still correspond to the same connection.
#[derive(Debug, Clone)]
pub struct UdpHandle {
pub events: Arc<Mutex<VoiceUDPEvents>>,
socket: Arc<UdpSocket>,
pub data: Arc<RwLock<VoiceData>>,
}
impl UdpHandle {
/// Constructs and sends encoded opus rtp data.
///
/// Automatically makes an [RtpPacket](discorrtp::rtp::RtpPacket), encrypts it and sends it.
pub async fn send_opus_data(&self, timestamp: u32, payload: Vec<u8>) {
let ssrc = self.data.read().await.ready_data.clone().unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number;
let payload_len = payload.len();
let rtp_data = discortp::rtp::Rtp {
// Always the same
version: 2,
padding: 0,
extension: 0,
csrc_count: 0,
csrc_list: Vec::new(),
marker: 0,
payload_type: discortp::rtp::RtpType::Dynamic(120),
// Actually variable
sequence: sequence_number.into(),
timestamp: timestamp.into(),
ssrc,
payload,
};
let buffer_size = payload_len + RTP_HEADER_SIZE as usize;
let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await;
}
/// Encrypts and sends and rtp packet.
pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await;
}
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload
pub async fn encrypt_rtp_packet_payload(
&self,
packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Vec<u8> {
let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone();
if session_description_result.is_none() {
// FIXME: Make this function reutrn a result with a proper error type for these kinds
// of functions
panic!("Trying to encrypt packet but no key provided yet");
}
let session_description = session_description_result.unwrap();
let nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(packet.packet());
let nonce = GenericArray::from_slice(&nonce_bytes);
let key = GenericArray::from_slice(&session_description.secret_key);
let encryptor = XSalsa20Poly1305::new(key);
let encryption_result = encryptor.encrypt(nonce, payload);
if encryption_result.is_err() {
// FIXME: See above fixme
panic!("Encryption error");
}
let mut encrypted_payload = encryption_result.unwrap();
// We need to allocate a new buffer, since the old one is too small for our new encrypted
// data
let buffer_size = encrypted_payload.len() + RTP_HEADER_SIZE as usize;
let mut new_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
let mut rtp_header = packet.packet().to_vec()[0..RTP_HEADER_SIZE as usize].to_vec();
new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload);
new_buffer
}
/// Sends an (already encrypted) rtp packet to the connection.
pub async fn send_encrypted_rtp_packet(&self, packet: discortp::rtp::RtpPacket<'_>) {
let raw_bytes = packet.packet();
self.socket.send(raw_bytes).await.unwrap();
debug!("VUDP: Sent rtp packet!");
}
}
#[derive(Debug)] #[derive(Debug)]
/// The main UDP struct, which handles receiving, parsing and decrypting the rtp packets
pub struct UdpHandler { pub struct UdpHandler {
events: Arc<Mutex<VoiceUDPEvents>>, events: Arc<Mutex<VoiceUDPEvents>>,
pub data: Arc<RwLock<VoiceData>>, pub data: Arc<RwLock<VoiceData>>,
@ -173,15 +57,12 @@ impl UdpHandler {
payload: Vec::new(), payload: Vec::new(),
}; };
let mut buf: Vec<u8> = Vec::new();
let size = IpDiscoveryPacket::minimum_packet_size() + 64; let size = IpDiscoveryPacket::minimum_packet_size() + 64;
for _i in 0..size { let mut buf: Vec<u8> = vec![0; size];
buf.push(0);
}
// TODO: Make this not panic everything // TODO: Make this not panic everything
// Actually, if this panics, something is very, very wrong
let mut ip_discovery_packet = let mut ip_discovery_packet =
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet"); MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet");
@ -250,7 +131,7 @@ impl UdpHandler {
/// The main listen task; /// The main listen task;
/// ///
/// Receives udp messages and parses them. /// Receives udp messages and parses them.
pub async fn listen_task(&mut self) { async fn listen_task(&mut self) {
loop { loop {
// FIXME: is there a max size for these packets? // FIXME: is there a max size for these packets?
// Allocating 512 bytes seems a bit extreme // Allocating 512 bytes seems a bit extreme
@ -287,7 +168,7 @@ impl UdpHandler {
let nonce_bytes = match session_description.encryption_mode { let nonce_bytes = match session_description.encryption_mode {
crate::types::VoiceEncryptionMode::Xsalsa20Poly1305 => { crate::types::VoiceEncryptionMode::Xsalsa20Poly1305 => {
crypto::get_xsalsa20_poly1305_nonce(rtp.packet()) get_xsalsa20_poly1305_nonce(rtp.packet())
} }
_ => { _ => {
unimplemented!(); unimplemented!();
@ -380,28 +261,3 @@ impl UdpHandler {
} }
} }
} }
pub mod voice_udp_events {
use discortp::{rtcp::Rtcp, rtp::Rtp};
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
impl WebSocketEvent for Rtp {}
impl WebSocketEvent for Rtcp {}
#[derive(Debug)]
pub struct VoiceUDPEvents {
pub rtp: GatewayEvent<Rtp>,
pub rtcp: GatewayEvent<Rtcp>,
}
impl Default for VoiceUDPEvents {
fn default() -> Self {
Self {
rtp: GatewayEvent::new(),
rtcp: GatewayEvent::new(),
}
}
}
}

12
src/voice/udp/mod.rs Normal file
View File

@ -0,0 +1,12 @@
//! Defines the udp component of voice communications, sending and receiving raw rtp data.
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure>
/// This always adds up to 12 bytes
const RTP_HEADER_SIZE: u8 = 12;
pub mod handle;
pub mod events;
pub mod handler;
pub use handle::*;
pub use handler::*;