diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index a4f27a1..7586d6f 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -94,6 +94,12 @@ pub struct GatewayEvent { } impl GatewayEvent { + pub fn new() -> Self { + Self { + observers: Vec::new(), + } + } + /// Returns true if the GatewayEvent is observed by at least one Observer. pub fn is_observed(&self) -> bool { !self.observers.is_empty() diff --git a/src/types/events/voice_gateway/speaking.rs b/src/types/events/voice_gateway/speaking.rs index adbbe00..c31e7e1 100644 --- a/src/types/events/voice_gateway/speaking.rs +++ b/src/types/events/voice_gateway/speaking.rs @@ -14,7 +14,7 @@ pub struct Speaking { /// /// See [SpeakingBitFlags] pub speaking: u8, - pub ssrc: i32, + pub ssrc: u32, /// The user id of the speaking user, only sent by the server #[serde(skip_serializing)] pub user_id: Option, diff --git a/src/voice/crypto.rs b/src/voice/crypto.rs index 6bba0ad..ccf39b6 100644 --- a/src/voice/crypto.rs +++ b/src/voice/crypto.rs @@ -2,12 +2,9 @@ //! //! All functions in this module return a 24 byte long [Vec]. -use discortp::Packet; - /// Gets an xsalsa20poly1305 nonce from an rtppacket. -pub(crate) fn get_xsalsa20_poly1305_nonce(packet: discortp::rtp::RtpPacket) -> Vec { - - let mut rtp_header = packet.packet()[0..12].to_vec(); +pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec { + let mut rtp_header = packet[0..12].to_vec(); // The header is only 12 bytes, but the nonce has to be 24 // This actually works mind you, and anything else doesn't diff --git a/src/voice/gateway.rs b/src/voice/gateway.rs index c8f374a..3616cd3 100644 --- a/src/voice/gateway.rs +++ b/src/voice/gateway.rs @@ -671,13 +671,16 @@ struct VoiceHeartbeatThreadCommunication { } pub mod voice_events { - use crate::types::{ - SessionDescription, SessionUpdate, VoiceBackendVersion, VoiceClientConnectFlags, - VoiceClientConnectPlatform, VoiceClientDisconnection, VoiceMediaSinkWants, VoiceReady, + use crate::{ + errors::VoiceGatewayError, + gateway::GatewayEvent, + types::{ + SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion, + VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection, + VoiceMediaSinkWants, VoiceReady, + }, }; - use super::*; - #[derive(Default, Debug)] pub struct VoiceEvents { pub voice_ready: GatewayEvent, diff --git a/src/voice/mod.rs b/src/voice/mod.rs index d1c08f7..8d84a0a 100644 --- a/src/voice/mod.rs +++ b/src/voice/mod.rs @@ -1,9 +1,9 @@ //! Module for all voice functionality within chorus. +mod crypto; pub mod gateway; pub mod udp; pub mod voice_data; -mod crypto; // Pub use this so users can interact with packet types if they want pub use discortp; diff --git a/src/voice/udp.rs b/src/voice/udp.rs index 1c9d2d6..126cc0a 100644 --- a/src/voice/udp.rs +++ b/src/voice/udp.rs @@ -1,11 +1,16 @@ //! Defines voice raw udp socket handling +use self::voice_udp_events::VoiceUDPEvents; + use super::crypto; use std::{net::SocketAddr, sync::Arc}; use log::{debug, info, trace, warn}; -use tokio::{net::UdpSocket, sync::RwLock}; +use tokio::{ + net::UdpSocket, + sync::{Mutex, RwLock}, +}; use crypto_secretbox::{ aead::Aead, cipher::generic_array::GenericArray, KeyInit, XSalsa20Poly1305, @@ -14,6 +19,7 @@ use crypto_secretbox::{ use discortp::{ demux::{demux, Demuxed}, discord::{IpDiscovery, IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket}, + rtcp::report::{ReceiverReport, SenderReport}, Packet, }; @@ -28,6 +34,7 @@ const RTP_HEADER_SIZE: u8 = 12; /// Can be safely cloned and will still correspond to the same connection. #[derive(Debug, Clone)] pub struct UdpHandle { + pub events: Arc>, socket: Arc, pub data: Arc>, } @@ -75,14 +82,17 @@ impl UdpHandle { /// Encrypts and sends and rtp packet. pub async fn send_rtp_packet(&self, packet: discortp::rtp::MutableRtpPacket<'_>) { - let mut mutable_packet = packet; - self.encrypt_rtp_packet(&mut mutable_packet).await; - self.send_encrypted_rtp_packet(mutable_packet.consume_to_immutable()) + let mut buffer = self.encrypt_rtp_packet_payload(&packet).await; + let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap(); + self.send_encrypted_rtp_packet(new_packet.consume_to_immutable()) .await; } - /// Encrypts an unecnrypted rtp packet, mutating its payload. - pub async fn encrypt_rtp_packet(&self, packet: &mut discortp::rtp::MutableRtpPacket<'_>) { + /// Encrypts an unencrypted rtp packet, returning an encrypted copy if its payload. + pub async fn encrypt_rtp_packet_payload( + &self, + packet: &discortp::rtp::MutableRtpPacket<'_>, + ) -> Vec { let payload = packet.payload(); let data_lock = self.data.read().await; @@ -97,7 +107,7 @@ impl UdpHandle { let session_description = session_description_result.unwrap(); - let nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(packet.to_immutable()); + let nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(packet.packet()); let nonce = GenericArray::from_slice(&nonce_bytes); let key = GenericArray::from_slice(&session_description.secret_key); @@ -113,7 +123,18 @@ impl UdpHandle { let encrypted_payload = encryption_result.unwrap(); - packet.set_payload(&encrypted_payload); + // We need to allocate a new buffer, since the old one is too small for our new encrypted + // data + let mut new_buffer = packet.packet().to_vec(); + + let buffer_size = encrypted_payload.len() + RTP_HEADER_SIZE as usize; + + // Fill the buffer + while new_buffer.len() <= buffer_size { + new_buffer.push(0); + } + + new_buffer } /// Sends an (already encrypted) rtp packet to the connection. @@ -121,11 +142,14 @@ impl UdpHandle { let raw_bytes = packet.packet(); self.socket.send(raw_bytes).await.unwrap(); + + debug!("VUDP: Sent rtp packet!"); } } #[derive(Debug)] pub struct UdpHandler { + events: Arc>, pub data: Arc>, socket: Arc, } @@ -207,7 +231,11 @@ impl UdpHandler { let socket = Arc::new(udp_socket); + let events = VoiceUDPEvents::default(); + let shared_events = Arc::new(Mutex::new(events)); + let mut handler = UdpHandler { + events: shared_events.clone(), data: data_reference.clone(), socket: socket.clone(), }; @@ -218,6 +246,7 @@ impl UdpHandler { }); UdpHandle { + events: shared_events, socket, data: data_reference, } @@ -252,11 +281,7 @@ impl UdpHandler { match parsed { Demuxed::Rtp(rtp) => { let ciphertext = buf[12..buf.len()].to_vec(); - trace!( - "VUDP: Parsed packet as rtp! {:?}; data: {:?}", - rtp, - ciphertext - ); + trace!("VUDP: Parsed packet as rtp!"); let data_lock = self.data.read().await; @@ -273,7 +298,7 @@ impl UdpHandler { match session_description.encryption_mode { crate::types::VoiceEncryptionMode::Xsalsa20Poly1305 => { - nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(rtp); + nonce_bytes = crypto::get_xsalsa20_poly1305_nonce(rtp.packet()); } _ => { unimplemented!(); @@ -298,10 +323,66 @@ impl UdpHandler { let decrypted = decryption_result.unwrap(); - info!("VUDP: SUCCESSFULLY DECRYPTED VOICE DATA!!! {:?}", decrypted); + debug!("VUDP: Successfully decrypted voice data!"); + + let rtp_with_decrypted_data = discortp::rtp::Rtp { + ssrc: rtp.get_ssrc(), + marker: rtp.get_marker(), + version: rtp.get_version(), + padding: rtp.get_padding(), + sequence: rtp.get_sequence(), + extension: rtp.get_extension(), + timestamp: rtp.get_timestamp(), + csrc_list: rtp.get_csrc_list(), + csrc_count: rtp.get_csrc_count(), + payload_type: rtp.get_payload_type(), + payload: decrypted, + }; + + self.events + .lock() + .await + .rtp + .notify(rtp_with_decrypted_data) + .await; } Demuxed::Rtcp(rtcp) => { - trace!("VUDP: Parsed packet as rtcp! {:?}", rtcp); + trace!("VUDP: Parsed packet as rtcp!"); + + let rtcp_data; + + match rtcp { + discortp::rtcp::RtcpPacket::KnownType(knowntype) => { + rtcp_data = discortp::rtcp::Rtcp::KnownType(knowntype); + } + discortp::rtcp::RtcpPacket::SenderReport(senderreport) => { + rtcp_data = discortp::rtcp::Rtcp::SenderReport(SenderReport { + payload: senderreport.payload().to_vec(), + padding: senderreport.get_padding(), + version: senderreport.get_version(), + ssrc: senderreport.get_ssrc(), + pkt_length: senderreport.get_pkt_length(), + packet_type: senderreport.get_packet_type(), + rx_report_count: senderreport.get_rx_report_count(), + }); + } + discortp::rtcp::RtcpPacket::ReceiverReport(receiverreport) => { + rtcp_data = discortp::rtcp::Rtcp::ReceiverReport(ReceiverReport { + payload: receiverreport.payload().to_vec(), + padding: receiverreport.get_padding(), + version: receiverreport.get_version(), + ssrc: receiverreport.get_ssrc(), + pkt_length: receiverreport.get_pkt_length(), + packet_type: receiverreport.get_packet_type(), + rx_report_count: receiverreport.get_rx_report_count(), + }); + } + _ => { + unreachable!(); + } + } + + self.events.lock().await.rtcp.notify(rtcp_data).await; } Demuxed::FailedParse(e) => { trace!("VUDP: Failed to parse packet: {:?}", e); @@ -312,3 +393,28 @@ impl UdpHandler { } } } + +pub mod voice_udp_events { + + use discortp::{rtcp::Rtcp, rtp::Rtp}; + + use crate::{gateway::GatewayEvent, types::WebSocketEvent}; + + impl WebSocketEvent for Rtp {} + impl WebSocketEvent for Rtcp {} + + #[derive(Debug)] + pub struct VoiceUDPEvents { + pub rtp: GatewayEvent, + pub rtcp: GatewayEvent, + } + + impl Default for VoiceUDPEvents { + fn default() -> Self { + Self { + rtp: GatewayEvent::new(), + rtcp: GatewayEvent::new(), + } + } + } +}