diff --git a/src/gateway/default/gateway.rs b/src/gateway/default/gateway.rs index a8bb69c..73178ba 100644 --- a/src/gateway/default/gateway.rs +++ b/src/gateway/default/gateway.rs @@ -8,7 +8,7 @@ use crate::types::{self, WebSocketEvent}; #[derive(Debug)] pub struct DefaultGateway { events: Arc>, - heartbeat_handler: HeartbeatHandler, + heartbeat_handler: HeartbeatHandler>>, websocket_send: Arc< Mutex< SplitSink< @@ -30,7 +30,9 @@ impl WebSocketStream>, > for DefaultGateway { - fn get_heartbeat_handler(&self) -> &HeartbeatHandler { + fn get_heartbeat_handler( + &self, + ) -> &HeartbeatHandler>> { &self.heartbeat_handler } @@ -171,7 +173,6 @@ impl DefaultGateway { /// Deserializes and updates a dispatched event, when we already know its type; /// (Called for every event in handle_message) - #[allow(dead_code)] // TODO: Remove this allow annotation async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( data: &'a str, event: &mut GatewayEvent, diff --git a/src/gateway/default/heartbeat.rs b/src/gateway/default/heartbeat.rs deleted file mode 100644 index 4b9fff7..0000000 --- a/src/gateway/default/heartbeat.rs +++ /dev/null @@ -1,50 +0,0 @@ -use super::*; - -// TODO: Make me not a trait and delete this file -#[async_trait] -impl - HeartbeatHandlerCapable< - tokio_tungstenite::tungstenite::Message, - WebSocketStream>, - > for HeartbeatHandler -{ - fn get_send(&self) -> &Sender { - &self.send - } - - fn get_heartbeat_interval(&self) -> Duration { - self.heartbeat_interval - } - - fn new( - heartbeat_interval: Duration, - websocket_tx: Arc< - Mutex< - SplitSink< - WebSocketStream>, - tokio_tungstenite::tungstenite::Message, - >, - >, - >, - kill_rc: tokio::sync::broadcast::Receiver<()>, - ) -> HeartbeatHandler { - let (send, receive) = tokio::sync::mpsc::channel(32); - let kill_receive = kill_rc.resubscribe(); - - let handle: JoinHandle<()> = task::spawn(async move { - HeartbeatHandler::heartbeat_task( - websocket_tx, - heartbeat_interval, - receive, - kill_receive, - ) - .await; - }); - - Self { - heartbeat_interval, - send, - handle, - } - } -} diff --git a/src/gateway/default/mod.rs b/src/gateway/default/mod.rs index ae9db05..d83b64e 100644 --- a/src/gateway/default/mod.rs +++ b/src/gateway/default/mod.rs @@ -1,6 +1,5 @@ pub mod gateway; pub mod handle; -pub mod heartbeat; use super::*; pub use gateway::*; @@ -15,16 +14,12 @@ use std::fmt::Debug; use std::sync::{Arc, RwLock}; use std::time::Duration; -use futures_util::stream::SplitSink; -use futures_util::stream::SplitStream; +use futures_util::stream::{SplitSink, SplitStream}; use log::{info, warn}; -use tokio::net::TcpStream; -use tokio::sync::mpsc::Sender; -use tokio::sync::Mutex; -use tokio::task; -use tokio::task::JoinHandle; -use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream}; +use tokio::{net::TcpStream, sync::Mutex, task}; +use tokio_tungstenite::{ + connect_async_tls_with_config, Connector, MaybeTlsStream, WebSocketStream, +}; impl crate::gateway::MessageCapable for tokio_tungstenite::tungstenite::Message { fn as_string(&self) -> Option { diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 44c8ee2..3318d6b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,7 +10,7 @@ pub mod wasm; pub use default::*; pub use message::*; use safina_timer::sleep_until; -use tokio::task::JoinHandle; +use tokio::task::{self, JoinHandle}; // TODO: Uncomment for Prod! // #[cfg(all(target_arch = "wasm32", feature = "client"))] pub use wasm::*; @@ -25,6 +25,7 @@ use crate::types::{ use std::any::Any; use std::collections::HashMap; +use std::marker::PhantomData; use std::sync::{Arc, RwLock}; use std::time::{self, Duration, Instant}; @@ -169,7 +170,7 @@ where fn get_websocket_send(&self) -> Arc>>; fn get_store(&self) -> GatewayStore; fn get_url(&self) -> String; - fn get_heartbeat_handler(&self) -> &HeartbeatHandler; + fn get_heartbeat_handler(&self) -> &HeartbeatHandler; /// Returns a Result with a matching impl of [`GatewayHandleCapable`], or a [`GatewayError`] /// /// DOCUMENTME: Explain what this method has to do to be a good get_handle() impl, or link to such documentation @@ -379,7 +380,7 @@ where op_code: Some(GATEWAY_HEARTBEAT), }; - let heartbeat_thread_communicator = self.get_heartbeat_handler().get_send(); + let heartbeat_thread_communicator = self.get_heartbeat_handler().send; heartbeat_thread_communicator .send(heartbeat_communication) @@ -408,7 +409,7 @@ where }; let heartbeat_handler = self.get_heartbeat_handler(); - let heartbeat_thread_communicator = heartbeat_handler.get_send(); + let heartbeat_thread_communicator = heartbeat_handler.send; heartbeat_thread_communicator .send(heartbeat_communication) @@ -441,7 +442,7 @@ where }; let heartbeat_handler = self.get_heartbeat_handler(); - let heartbeat_thread_communicator = heartbeat_handler.get_send(); + let heartbeat_thread_communicator = heartbeat_handler.send; heartbeat_thread_communicator .send(heartbeat_communication) .await @@ -559,19 +560,19 @@ where } /// Handles sending heartbeats to the gateway in another thread -#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used #[derive(Debug)] -pub struct HeartbeatHandler { +pub struct HeartbeatHandler> { /// How ofter heartbeats need to be sent at a minimum pub heartbeat_interval: Duration, /// The send channel for the heartbeat thread pub send: Sender, /// The handle of the thread handle: JoinHandle<()>, + hb_type: (PhantomData, PhantomData), } -impl HeartbeatHandler { - pub async fn heartbeat_task + Send>( +impl + Send> HeartbeatHandler { + pub async fn heartbeat_task( websocket_tx: Arc>>, heartbeat_interval: Duration, mut receive: tokio::sync::mpsc::Receiver, @@ -651,17 +652,30 @@ impl HeartbeatHandler { } } } -} -#[async_trait] -// TODO: Make me not a trait!! -pub trait HeartbeatHandlerCapable> { - fn get_send(&self) -> &Sender; - fn get_heartbeat_interval(&self) -> Duration; - #[allow(clippy::new_ret_no_self)] - // TODO: new() has duplicated code in wasm and default impl. Can be fixed, if this is not a trait + fn new( heartbeat_interval: Duration, websocket_tx: Arc>>, kill_rc: tokio::sync::broadcast::Receiver<()>, - ) -> HeartbeatHandler; + ) -> HeartbeatHandler { + let (send, receive) = tokio::sync::mpsc::channel(32); + let kill_receive = kill_rc.resubscribe(); + + let handle: JoinHandle<()> = task::spawn(async move { + HeartbeatHandler::heartbeat_task( + websocket_tx, + heartbeat_interval, + receive, + kill_receive, + ) + .await; + }); + + Self { + heartbeat_interval, + send, + handle, + hb_type: (PhantomData::, PhantomData::), + } + } } diff --git a/src/gateway/wasm/gateway.rs b/src/gateway/wasm/gateway.rs index cdbb991..b7ca51b 100644 --- a/src/gateway/wasm/gateway.rs +++ b/src/gateway/wasm/gateway.rs @@ -12,7 +12,7 @@ use crate::types::{self, GatewayReceivePayload}; #[derive(Debug)] pub struct WasmGateway { events: Arc>, - heartbeat_handler: HeartbeatHandler, + heartbeat_handler: HeartbeatHandler, websocket_send: Arc>>, websocket_receive: SplitStream, kill_send: tokio::sync::broadcast::Sender<()>, @@ -110,7 +110,7 @@ impl GatewayCapable for WasmGateway { )) } - fn get_heartbeat_handler(&self) -> &HeartbeatHandler { + fn get_heartbeat_handler(&self) -> &HeartbeatHandler { &self.heartbeat_handler } diff --git a/src/gateway/wasm/heartbeat.rs b/src/gateway/wasm/heartbeat.rs deleted file mode 100644 index 80f863e..0000000 --- a/src/gateway/wasm/heartbeat.rs +++ /dev/null @@ -1,40 +0,0 @@ -use tokio::task::{self, JoinHandle}; -use ws_stream_wasm::*; - -use super::*; - -#[async_trait] -impl HeartbeatHandlerCapable for HeartbeatHandler { - fn get_send(&self) -> &Sender { - &self.send - } - - fn get_heartbeat_interval(&self) -> Duration { - self.heartbeat_interval - } - - fn new( - heartbeat_interval: Duration, - websocket_tx: Arc>>, - kill_rc: tokio::sync::broadcast::Receiver<()>, - ) -> HeartbeatHandler { - let (send, receive) = tokio::sync::mpsc::channel(32); - let kill_receive = kill_rc.resubscribe(); - - let handle: JoinHandle<()> = task::spawn(async move { - HeartbeatHandler::heartbeat_task( - websocket_tx, - heartbeat_interval, - receive, - kill_receive, - ) - .await; - }); - - Self { - heartbeat_interval, - send, - handle, - } - } -} diff --git a/src/gateway/wasm/mod.rs b/src/gateway/wasm/mod.rs index 6cb87f2..e664ad2 100644 --- a/src/gateway/wasm/mod.rs +++ b/src/gateway/wasm/mod.rs @@ -1,10 +1,8 @@ pub mod gateway; pub mod handle; -pub mod heartbeat; use super::*; pub use gateway::*; pub use handle::*; -pub use heartbeat::*; use ws_stream_wasm::WsMessage; impl crate::gateway::MessageCapable for WsMessage {