Make HBHandler<T,S> struct instead of trait

Make HeartbeatHandler a generic struct instead of a struct with a trait.
Reduces redundant code
This commit is contained in:
bitfl0wer 2023-11-19 17:07:08 +01:00
parent 848bfcd13b
commit 064014474c
No known key found for this signature in database
GPG Key ID: 0ACD574FCF5226CF
7 changed files with 43 additions and 125 deletions

View File

@ -8,7 +8,7 @@ use crate::types::{self, WebSocketEvent};
#[derive(Debug)] #[derive(Debug)]
pub struct DefaultGateway { pub struct DefaultGateway {
events: Arc<Mutex<Events>>, events: Arc<Mutex<Events>>,
heartbeat_handler: HeartbeatHandler, heartbeat_handler: HeartbeatHandler<Message, WebSocketStream<MaybeTlsStream<TcpStream>>>,
websocket_send: Arc< websocket_send: Arc<
Mutex< Mutex<
SplitSink< SplitSink<
@ -30,7 +30,9 @@ impl
WebSocketStream<MaybeTlsStream<TcpStream>>, WebSocketStream<MaybeTlsStream<TcpStream>>,
> for DefaultGateway > for DefaultGateway
{ {
fn get_heartbeat_handler(&self) -> &HeartbeatHandler { fn get_heartbeat_handler(
&self,
) -> &HeartbeatHandler<Message, WebSocketStream<MaybeTlsStream<TcpStream>>> {
&self.heartbeat_handler &self.heartbeat_handler
} }
@ -171,7 +173,6 @@ impl DefaultGateway {
/// Deserializes and updates a dispatched event, when we already know its type; /// Deserializes and updates a dispatched event, when we already know its type;
/// (Called for every event in handle_message) /// (Called for every event in handle_message)
#[allow(dead_code)] // TODO: Remove this allow annotation
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
data: &'a str, data: &'a str,
event: &mut GatewayEvent<T>, event: &mut GatewayEvent<T>,

View File

@ -1,50 +0,0 @@
use super::*;
// TODO: Make me not a trait and delete this file
#[async_trait]
impl
HeartbeatHandlerCapable<
tokio_tungstenite::tungstenite::Message,
WebSocketStream<MaybeTlsStream<TcpStream>>,
> for HeartbeatHandler
{
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication> {
&self.send
}
fn get_heartbeat_interval(&self) -> Duration {
self.heartbeat_interval
}
fn new(
heartbeat_interval: Duration,
websocket_tx: Arc<
Mutex<
SplitSink<
WebSocketStream<MaybeTlsStream<TcpStream>>,
tokio_tungstenite::tungstenite::Message,
>,
>,
>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> HeartbeatHandler {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
let handle: JoinHandle<()> = task::spawn(async move {
HeartbeatHandler::heartbeat_task(
websocket_tx,
heartbeat_interval,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
handle,
}
}
}

View File

@ -1,6 +1,5 @@
pub mod gateway; pub mod gateway;
pub mod handle; pub mod handle;
pub mod heartbeat;
use super::*; use super::*;
pub use gateway::*; pub use gateway::*;
@ -15,16 +14,12 @@ use std::fmt::Debug;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use futures_util::stream::SplitSink; use futures_util::stream::{SplitSink, SplitStream};
use futures_util::stream::SplitStream;
use log::{info, warn}; use log::{info, warn};
use tokio::net::TcpStream; use tokio::{net::TcpStream, sync::Mutex, task};
use tokio::sync::mpsc::Sender; use tokio_tungstenite::{
use tokio::sync::Mutex; connect_async_tls_with_config, Connector, MaybeTlsStream, WebSocketStream,
use tokio::task; };
use tokio::task::JoinHandle;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream};
impl crate::gateway::MessageCapable for tokio_tungstenite::tungstenite::Message { impl crate::gateway::MessageCapable for tokio_tungstenite::tungstenite::Message {
fn as_string(&self) -> Option<String> { fn as_string(&self) -> Option<String> {

View File

@ -10,7 +10,7 @@ pub mod wasm;
pub use default::*; pub use default::*;
pub use message::*; pub use message::*;
use safina_timer::sleep_until; use safina_timer::sleep_until;
use tokio::task::JoinHandle; use tokio::task::{self, JoinHandle};
// TODO: Uncomment for Prod! // TODO: Uncomment for Prod!
// #[cfg(all(target_arch = "wasm32", feature = "client"))] // #[cfg(all(target_arch = "wasm32", feature = "client"))]
pub use wasm::*; pub use wasm::*;
@ -25,6 +25,7 @@ use crate::types::{
use std::any::Any; use std::any::Any;
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{self, Duration, Instant}; use std::time::{self, Duration, Instant};
@ -169,7 +170,7 @@ where
fn get_websocket_send(&self) -> Arc<Mutex<SplitSink<S, T>>>; fn get_websocket_send(&self) -> Arc<Mutex<SplitSink<S, T>>>;
fn get_store(&self) -> GatewayStore; fn get_store(&self) -> GatewayStore;
fn get_url(&self) -> String; fn get_url(&self) -> String;
fn get_heartbeat_handler(&self) -> &HeartbeatHandler; fn get_heartbeat_handler(&self) -> &HeartbeatHandler<T, S>;
/// Returns a Result with a matching impl of [`GatewayHandleCapable`], or a [`GatewayError`] /// Returns a Result with a matching impl of [`GatewayHandleCapable`], or a [`GatewayError`]
/// ///
/// DOCUMENTME: Explain what this method has to do to be a good get_handle() impl, or link to such documentation /// DOCUMENTME: Explain what this method has to do to be a good get_handle() impl, or link to such documentation
@ -379,7 +380,7 @@ where
op_code: Some(GATEWAY_HEARTBEAT), op_code: Some(GATEWAY_HEARTBEAT),
}; };
let heartbeat_thread_communicator = self.get_heartbeat_handler().get_send(); let heartbeat_thread_communicator = self.get_heartbeat_handler().send;
heartbeat_thread_communicator heartbeat_thread_communicator
.send(heartbeat_communication) .send(heartbeat_communication)
@ -408,7 +409,7 @@ where
}; };
let heartbeat_handler = self.get_heartbeat_handler(); let heartbeat_handler = self.get_heartbeat_handler();
let heartbeat_thread_communicator = heartbeat_handler.get_send(); let heartbeat_thread_communicator = heartbeat_handler.send;
heartbeat_thread_communicator heartbeat_thread_communicator
.send(heartbeat_communication) .send(heartbeat_communication)
@ -441,7 +442,7 @@ where
}; };
let heartbeat_handler = self.get_heartbeat_handler(); let heartbeat_handler = self.get_heartbeat_handler();
let heartbeat_thread_communicator = heartbeat_handler.get_send(); let heartbeat_thread_communicator = heartbeat_handler.send;
heartbeat_thread_communicator heartbeat_thread_communicator
.send(heartbeat_communication) .send(heartbeat_communication)
.await .await
@ -559,19 +560,19 @@ where
} }
/// Handles sending heartbeats to the gateway in another thread /// Handles sending heartbeats to the gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
#[derive(Debug)] #[derive(Debug)]
pub struct HeartbeatHandler { pub struct HeartbeatHandler<T: MessageCapable + Send + 'static, S: Sink<T>> {
/// How ofter heartbeats need to be sent at a minimum /// How ofter heartbeats need to be sent at a minimum
pub heartbeat_interval: Duration, pub heartbeat_interval: Duration,
/// The send channel for the heartbeat thread /// The send channel for the heartbeat thread
pub send: Sender<HeartbeatThreadCommunication>, pub send: Sender<HeartbeatThreadCommunication>,
/// The handle of the thread /// The handle of the thread
handle: JoinHandle<()>, handle: JoinHandle<()>,
hb_type: (PhantomData<T>, PhantomData<S>),
} }
impl HeartbeatHandler { impl<T: MessageCapable + Send + 'static, S: Sink<T> + Send> HeartbeatHandler<T, S> {
pub async fn heartbeat_task<T: MessageCapable + Send + 'static, S: Sink<T> + Send>( pub async fn heartbeat_task(
websocket_tx: Arc<Mutex<SplitSink<S, T>>>, websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
heartbeat_interval: Duration, heartbeat_interval: Duration,
mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>, mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>,
@ -651,17 +652,30 @@ impl HeartbeatHandler {
} }
} }
} }
}
#[async_trait]
// TODO: Make me not a trait!!
pub trait HeartbeatHandlerCapable<T: MessageCapable + Send + 'static, S: Sink<T>> {
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication>;
fn get_heartbeat_interval(&self) -> Duration;
#[allow(clippy::new_ret_no_self)]
// TODO: new() has duplicated code in wasm and default impl. Can be fixed, if this is not a trait
fn new( fn new(
heartbeat_interval: Duration, heartbeat_interval: Duration,
websocket_tx: Arc<Mutex<SplitSink<S, T>>>, websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
kill_rc: tokio::sync::broadcast::Receiver<()>, kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> HeartbeatHandler; ) -> HeartbeatHandler<T, S> {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
let handle: JoinHandle<()> = task::spawn(async move {
HeartbeatHandler::heartbeat_task(
websocket_tx,
heartbeat_interval,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
handle,
hb_type: (PhantomData::<T>, PhantomData::<S>),
}
}
} }

View File

@ -12,7 +12,7 @@ use crate::types::{self, GatewayReceivePayload};
#[derive(Debug)] #[derive(Debug)]
pub struct WasmGateway { pub struct WasmGateway {
events: Arc<Mutex<Events>>, events: Arc<Mutex<Events>>,
heartbeat_handler: HeartbeatHandler, heartbeat_handler: HeartbeatHandler<WsMessage, WsStream>,
websocket_send: Arc<Mutex<SplitSink<WsStream, WsMessage>>>, websocket_send: Arc<Mutex<SplitSink<WsStream, WsMessage>>>,
websocket_receive: SplitStream<WsStream>, websocket_receive: SplitStream<WsStream>,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
@ -110,7 +110,7 @@ impl GatewayCapable<WsMessage, WsStream> for WasmGateway {
)) ))
} }
fn get_heartbeat_handler(&self) -> &HeartbeatHandler { fn get_heartbeat_handler(&self) -> &HeartbeatHandler<WsMessage, WsStream> {
&self.heartbeat_handler &self.heartbeat_handler
} }

View File

@ -1,40 +0,0 @@
use tokio::task::{self, JoinHandle};
use ws_stream_wasm::*;
use super::*;
#[async_trait]
impl HeartbeatHandlerCapable<WsMessage, WsStream> for HeartbeatHandler {
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication> {
&self.send
}
fn get_heartbeat_interval(&self) -> Duration {
self.heartbeat_interval
}
fn new(
heartbeat_interval: Duration,
websocket_tx: Arc<Mutex<SplitSink<WsStream, WsMessage>>>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> HeartbeatHandler {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
let handle: JoinHandle<()> = task::spawn(async move {
HeartbeatHandler::heartbeat_task(
websocket_tx,
heartbeat_interval,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
handle,
}
}
}

View File

@ -1,10 +1,8 @@
pub mod gateway; pub mod gateway;
pub mod handle; pub mod handle;
pub mod heartbeat;
use super::*; use super::*;
pub use gateway::*; pub use gateway::*;
pub use handle::*; pub use handle::*;
pub use heartbeat::*;
use ws_stream_wasm::WsMessage; use ws_stream_wasm::WsMessage;
impl crate::gateway::MessageCapable for WsMessage { impl crate::gateway::MessageCapable for WsMessage {