From aac31726ec6721112b128fd4aecd71df925abd0b Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Tue, 30 Apr 2024 16:17:47 +0200 Subject: [PATCH] Reuse gateway backends, don't duplicate them for voice gateway (#493) --- src/errors.rs | 4 +- src/gateway/backends/tungstenite.rs | 18 +++--- src/gateway/backends/wasm.rs | 10 +--- src/gateway/gateway.rs | 9 ++- src/voice/gateway/backends/mod.rs | 17 ------ src/voice/gateway/backends/tungstenite.rs | 68 ++--------------------- src/voice/gateway/backends/wasm.rs | 29 +--------- src/voice/gateway/gateway.rs | 20 ++++--- src/voice/gateway/handle.rs | 13 +++-- src/voice/gateway/heartbeat.rs | 4 +- 10 files changed, 50 insertions(+), 142 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 722921a..0d130dd 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -93,7 +93,7 @@ custom_error! { DisallowedIntents = "You sent a disallowed intent. You may have tried to specify an intent that you have not enabled or are not approved for", // Errors when initiating a gateway connection - CannotConnect{error: String} = "Cannot connect due to a tungstenite error: {error}", + CannotConnect{error: String} = "Cannot connect due to a websocket error: {error}", NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong", // Other misc errors @@ -124,7 +124,7 @@ custom_error! { UnknownEncryptionMode = "Server failed to decrypt data", // Errors when initiating a gateway connection - CannotConnect{error: String} = "Cannot connect due to a tungstenite error: {error}", + CannotConnect{error: String} = "Cannot connect due to a websocket error: {error}", NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong", // Other misc errors diff --git a/src/gateway/backends/tungstenite.rs b/src/gateway/backends/tungstenite.rs index a9f9f64..6c7ac39 100644 --- a/src/gateway/backends/tungstenite.rs +++ b/src/gateway/backends/tungstenite.rs @@ -2,6 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +use custom_error::custom_error; use futures_util::{ stream::{SplitSink, SplitStream}, StreamExt, @@ -11,7 +12,6 @@ use tokio_tungstenite::{ connect_async_tls_with_config, tungstenite, Connector, MaybeTlsStream, WebSocketStream, }; -use crate::errors::GatewayError; use crate::gateway::GatewayMessage; #[derive(Debug, Clone)] @@ -22,18 +22,22 @@ pub type TungsteniteSink = SplitSink>, tungstenite::Message>; pub type TungsteniteStream = SplitStream>>; +custom_error! { + pub TungsteniteBackendError + FailedToLoadCerts{error: std::io::Error} = "failed to load platform native certs: {error}", + TungsteniteError{error: tungstenite::error::Error} = "encountered a tungstenite error: {error}", +} + impl TungsteniteBackend { pub async fn connect( websocket_url: &str, - ) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::GatewayError> { + ) -> Result<(TungsteniteSink, TungsteniteStream), TungsteniteBackendError> { let mut roots = rustls::RootCertStore::empty(); let certs = rustls_native_certs::load_native_certs(); if let Err(e) = certs { log::error!("Failed to load platform native certs! {:?}", e); - return Err(GatewayError::CannotConnect { - error: format!("{:?}", e), - }); + return Err(TungsteniteBackendError::FailedToLoadCerts { error: e }); } for cert in certs.unwrap() { @@ -55,8 +59,8 @@ impl TungsteniteBackend { { Ok(websocket_stream) => websocket_stream, Err(e) => { - return Err(GatewayError::CannotConnect { - error: e.to_string(), + return Err(TungsteniteBackendError::TungsteniteError { + error: e, }) } }; diff --git a/src/gateway/backends/wasm.rs b/src/gateway/backends/wasm.rs index 83f4b37..a40321d 100644 --- a/src/gateway/backends/wasm.rs +++ b/src/gateway/backends/wasm.rs @@ -9,7 +9,6 @@ use futures_util::{ use ws_stream_wasm::*; -use crate::errors::GatewayError; use crate::gateway::GatewayMessage; #[derive(Debug, Clone)] @@ -22,13 +21,8 @@ pub type WasmStream = SplitStream; impl WasmBackend { pub async fn connect( websocket_url: &str, - ) -> Result<(WasmSink, WasmStream), crate::errors::GatewayError> { - let (_, websocket_stream) = match WsMeta::connect(websocket_url, None).await { - Ok(stream) => Ok(stream), - Err(e) => Err(GatewayError::CannotConnect { - error: e.to_string(), - }), - }?; + ) -> Result<(WasmSink, WasmStream), ws_stream_wasm::WsErr> { + let (_, websocket_stream) = WsMeta::connect(websocket_url, None).await?; Ok(websocket_stream.split()) } diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs index dabfeb6..34808c9 100644 --- a/src/gateway/gateway.rs +++ b/src/gateway/gateway.rs @@ -35,7 +35,14 @@ impl Gateway { #[allow(clippy::new_ret_no_self)] pub async fn spawn(websocket_url: String) -> Result { let (websocket_send, mut websocket_receive) = - WebSocketBackend::connect(&websocket_url).await?; + match WebSocketBackend::connect(&websocket_url).await { + Ok(streams) => streams, + Err(e) => { + return Err(GatewayError::CannotConnect { + error: format!("{:?}", e), + }) + } + }; let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); diff --git a/src/voice/gateway/backends/mod.rs b/src/voice/gateway/backends/mod.rs index 7f3f3dd..23f2767 100644 --- a/src/voice/gateway/backends/mod.rs +++ b/src/voice/gateway/backends/mod.rs @@ -4,24 +4,7 @@ #[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))] pub mod tungstenite; -#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))] -pub use tungstenite::*; #[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))] pub mod wasm; -#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))] -pub use wasm::*; -#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))] -pub type Sink = tungstenite::TungsteniteSink; -#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))] -pub type Stream = tungstenite::TungsteniteStream; -#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))] -pub type WebSocketBackend = tungstenite::TungsteniteBackend; - -#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))] -pub type Sink = wasm::WasmSink; -#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))] -pub type Stream = wasm::WasmStream; -#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))] -pub type WebSocketBackend = wasm::WasmBackend; diff --git a/src/voice/gateway/backends/tungstenite.rs b/src/voice/gateway/backends/tungstenite.rs index 26cc0fe..599274d 100644 --- a/src/voice/gateway/backends/tungstenite.rs +++ b/src/voice/gateway/backends/tungstenite.rs @@ -2,76 +2,16 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures_util::{ - stream::{SplitSink, SplitStream}, - StreamExt, -}; -use tokio::net::TcpStream; -use tokio_tungstenite::{ - connect_async_tls_with_config, tungstenite, Connector, MaybeTlsStream, WebSocketStream, -}; +use crate::voice::gateway::VoiceGatewayMessage; -use crate::{errors::VoiceGatewayError, voice::gateway::VoiceGatewayMessage}; - -#[derive(Debug, Clone)] -pub struct TungsteniteBackend; - -// These could be made into inherent associated types when that's stabilized -pub type TungsteniteSink = - SplitSink>, tungstenite::Message>; -pub type TungsteniteStream = SplitStream>>; - -impl TungsteniteBackend { - pub async fn connect( - websocket_url: &str, - ) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> { - let mut roots = rustls::RootCertStore::empty(); - let certs = rustls_native_certs::load_native_certs(); - - if let Err(e) = certs { - log::error!("Failed to load platform native certs! {:?}", e); - return Err(VoiceGatewayError::CannotConnect { - error: format!("{:?}", e), - }); - } - - for cert in certs.unwrap() { - roots.add(&rustls::Certificate(cert.0)).unwrap(); - } - let (websocket_stream, _) = match connect_async_tls_with_config( - websocket_url, - None, - false, - Some(Connector::Rustls( - rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth() - .into(), - )), - ) - .await - { - Ok(websocket_stream) => websocket_stream, - Err(e) => { - return Err(VoiceGatewayError::CannotConnect { - error: e.to_string(), - }) - } - }; - - Ok(websocket_stream.split()) - } -} - -impl From for tungstenite::Message { +impl From for tokio_tungstenite::tungstenite::Message { fn from(message: VoiceGatewayMessage) -> Self { Self::Text(message.0) } } -impl From for VoiceGatewayMessage { - fn from(value: tungstenite::Message) -> Self { +impl From for VoiceGatewayMessage { + fn from(value: tokio_tungstenite::tungstenite::Message) -> Self { Self(value.to_string()) } } diff --git a/src/voice/gateway/backends/wasm.rs b/src/voice/gateway/backends/wasm.rs index a39723e..611b202 100644 --- a/src/voice/gateway/backends/wasm.rs +++ b/src/voice/gateway/backends/wasm.rs @@ -2,36 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -use futures_util::{ - stream::{SplitSink, SplitStream}, - StreamExt, -}; - -use ws_stream_wasm::*; - -use crate::errors::VoiceGatewayError; +use ws_stream_wasm::WsMessage; use crate::voice::gateway::VoiceGatewayMessage; -#[derive(Debug, Clone)] -pub struct WasmBackend; - -// These could be made into inherent associated types when that's stabilized -pub type WasmSink = SplitSink; -pub type WasmStream = SplitStream; - -impl WasmBackend { - pub async fn connect(websocket_url: &str) -> Result<(WasmSink, WasmStream), VoiceGatewayError> { - let (_, websocket_stream) = match WsMeta::connect(websocket_url, None).await { - Ok(stream) => Ok(stream), - Err(e) => Err(VoiceGatewayError::CannotConnect { - error: e.to_string(), - }), - }?; - - Ok(websocket_stream.split()) - } -} - impl From for WsMessage { fn from(message: VoiceGatewayMessage) -> Self { Self::Text(message.0) diff --git a/src/voice/gateway/gateway.rs b/src/voice/gateway/gateway.rs index 4727ae4..9a2a60b 100644 --- a/src/voice/gateway/gateway.rs +++ b/src/voice/gateway/gateway.rs @@ -11,6 +11,9 @@ use tokio::sync::Mutex; use futures_util::SinkExt; use futures_util::StreamExt; +use crate::gateway::Sink; +use crate::gateway::Stream; +use crate::gateway::WebSocketBackend; use crate::{ errors::VoiceGatewayError, gateway::GatewayEvent, @@ -21,14 +24,10 @@ use crate::{ VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, VOICE_SESSION_DESCRIPTION, VOICE_SESSION_UPDATE, VOICE_SPEAKING, VOICE_SSRC_DEFINITION, }, - voice::gateway::{ - heartbeat::VoiceHeartbeatThreadCommunication, VoiceGatewayMessage, WebSocketBackend, - }, + voice::gateway::{heartbeat::VoiceHeartbeatThreadCommunication, VoiceGatewayMessage}, }; -use super::{ - events::VoiceEvents, heartbeat::VoiceHeartbeatHandler, Sink, Stream, VoiceGatewayHandle, -}; +use super::{events::VoiceEvents, heartbeat::VoiceHeartbeatHandler, VoiceGatewayHandle}; #[derive(Debug)] pub struct VoiceGateway { @@ -48,7 +47,14 @@ impl VoiceGateway { trace!("Created voice socket url: {}", processed_url.clone()); let (websocket_send, mut websocket_receive) = - WebSocketBackend::connect(&processed_url).await?; + match WebSocketBackend::connect(&processed_url).await { + Ok(streams) => streams, + Err(e) => { + return Err(VoiceGatewayError::CannotConnect { + error: format!("{:?}", e), + }) + } + }; let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); diff --git a/src/voice/gateway/handle.rs b/src/voice/gateway/handle.rs index b48080a..8750f12 100644 --- a/src/voice/gateway/handle.rs +++ b/src/voice/gateway/handle.rs @@ -11,13 +11,16 @@ use futures_util::SinkExt; use serde_json::json; use tokio::sync::Mutex; -use crate::types::{ - SelectProtocol, Speaking, SsrcDefinition, VoiceGatewaySendPayload, VoiceIdentify, - VOICE_BACKEND_VERSION, VOICE_IDENTIFY, VOICE_SELECT_PROTOCOL, VOICE_SPEAKING, - VOICE_SSRC_DEFINITION, +use crate::{ + gateway::Sink, + types::{ + SelectProtocol, Speaking, SsrcDefinition, VoiceGatewaySendPayload, VoiceIdentify, + VOICE_BACKEND_VERSION, VOICE_IDENTIFY, VOICE_SELECT_PROTOCOL, VOICE_SPEAKING, + VOICE_SSRC_DEFINITION, + }, }; -use super::{events::VoiceEvents, Sink, VoiceGatewayMessage}; +use super::{events::VoiceEvents, VoiceGatewayMessage}; /// Represents a handle to a Voice Gateway connection. /// Using this handle you can send Gateway Events directly. diff --git a/src/voice/gateway/heartbeat.rs b/src/voice/gateway/heartbeat.rs index 2b9fde5..945bfbd 100644 --- a/src/voice/gateway/heartbeat.rs +++ b/src/voice/gateway/heartbeat.rs @@ -26,13 +26,11 @@ use tokio::sync::{ use tokio::task; use crate::{ - gateway::heartbeat::HEARTBEAT_ACK_TIMEOUT, + gateway::{heartbeat::HEARTBEAT_ACK_TIMEOUT, Sink}, types::{VoiceGatewaySendPayload, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK}, voice::gateway::VoiceGatewayMessage, }; -use super::Sink; - /// Handles sending heartbeats to the voice gateway in another thread #[allow(dead_code)] // FIXME: Remove this, once all fields of VoiceHeartbeatHandler are used #[derive(Debug)]