Make HBHandler<T,S> struct instead of trait
Make HeartbeatHandler a generic struct instead of a struct with a trait. Reduces redundant code
This commit is contained in:
parent
70812c529a
commit
19f8403bcf
|
@ -8,7 +8,7 @@ use crate::types::{self, WebSocketEvent};
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DefaultGateway {
|
pub struct DefaultGateway {
|
||||||
events: Arc<Mutex<Events>>,
|
events: Arc<Mutex<Events>>,
|
||||||
heartbeat_handler: HeartbeatHandler,
|
heartbeat_handler: HeartbeatHandler<Message, WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
||||||
websocket_send: Arc<
|
websocket_send: Arc<
|
||||||
Mutex<
|
Mutex<
|
||||||
SplitSink<
|
SplitSink<
|
||||||
|
@ -30,7 +30,9 @@ impl
|
||||||
WebSocketStream<MaybeTlsStream<TcpStream>>,
|
WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||||
> for DefaultGateway
|
> for DefaultGateway
|
||||||
{
|
{
|
||||||
fn get_heartbeat_handler(&self) -> &HeartbeatHandler {
|
fn get_heartbeat_handler(
|
||||||
|
&self,
|
||||||
|
) -> &HeartbeatHandler<Message, WebSocketStream<MaybeTlsStream<TcpStream>>> {
|
||||||
&self.heartbeat_handler
|
&self.heartbeat_handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +173,6 @@ impl DefaultGateway {
|
||||||
|
|
||||||
/// Deserializes and updates a dispatched event, when we already know its type;
|
/// Deserializes and updates a dispatched event, when we already know its type;
|
||||||
/// (Called for every event in handle_message)
|
/// (Called for every event in handle_message)
|
||||||
#[allow(dead_code)] // TODO: Remove this allow annotation
|
|
||||||
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
||||||
data: &'a str,
|
data: &'a str,
|
||||||
event: &mut GatewayEvent<T>,
|
event: &mut GatewayEvent<T>,
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
use super::*;
|
|
||||||
|
|
||||||
// TODO: Make me not a trait and delete this file
|
|
||||||
#[async_trait]
|
|
||||||
impl
|
|
||||||
HeartbeatHandlerCapable<
|
|
||||||
tokio_tungstenite::tungstenite::Message,
|
|
||||||
WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
||||||
> for HeartbeatHandler
|
|
||||||
{
|
|
||||||
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication> {
|
|
||||||
&self.send
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_heartbeat_interval(&self) -> Duration {
|
|
||||||
self.heartbeat_interval
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new(
|
|
||||||
heartbeat_interval: Duration,
|
|
||||||
websocket_tx: Arc<
|
|
||||||
Mutex<
|
|
||||||
SplitSink<
|
|
||||||
WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
||||||
tokio_tungstenite::tungstenite::Message,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
kill_rc: tokio::sync::broadcast::Receiver<()>,
|
|
||||||
) -> HeartbeatHandler {
|
|
||||||
let (send, receive) = tokio::sync::mpsc::channel(32);
|
|
||||||
let kill_receive = kill_rc.resubscribe();
|
|
||||||
|
|
||||||
let handle: JoinHandle<()> = task::spawn(async move {
|
|
||||||
HeartbeatHandler::heartbeat_task(
|
|
||||||
websocket_tx,
|
|
||||||
heartbeat_interval,
|
|
||||||
receive,
|
|
||||||
kill_receive,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
});
|
|
||||||
|
|
||||||
Self {
|
|
||||||
heartbeat_interval,
|
|
||||||
send,
|
|
||||||
handle,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,6 +1,5 @@
|
||||||
pub mod gateway;
|
pub mod gateway;
|
||||||
pub mod handle;
|
pub mod handle;
|
||||||
pub mod heartbeat;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
pub use gateway::*;
|
pub use gateway::*;
|
||||||
|
@ -15,16 +14,12 @@ use std::fmt::Debug;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures_util::stream::SplitSink;
|
use futures_util::stream::{SplitSink, SplitStream};
|
||||||
use futures_util::stream::SplitStream;
|
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use tokio::net::TcpStream;
|
use tokio::{net::TcpStream, sync::Mutex, task};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio_tungstenite::{
|
||||||
use tokio::sync::Mutex;
|
connect_async_tls_with_config, Connector, MaybeTlsStream, WebSocketStream,
|
||||||
use tokio::task;
|
};
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use tokio_tungstenite::MaybeTlsStream;
|
|
||||||
use tokio_tungstenite::{connect_async_tls_with_config, Connector, WebSocketStream};
|
|
||||||
|
|
||||||
impl crate::gateway::MessageCapable for tokio_tungstenite::tungstenite::Message {
|
impl crate::gateway::MessageCapable for tokio_tungstenite::tungstenite::Message {
|
||||||
fn as_string(&self) -> Option<String> {
|
fn as_string(&self) -> Option<String> {
|
||||||
|
|
|
@ -10,7 +10,7 @@ pub mod wasm;
|
||||||
pub use default::*;
|
pub use default::*;
|
||||||
pub use message::*;
|
pub use message::*;
|
||||||
use safina_timer::sleep_until;
|
use safina_timer::sleep_until;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::{self, JoinHandle};
|
||||||
// TODO: Uncomment for Prod!
|
// TODO: Uncomment for Prod!
|
||||||
// #[cfg(all(target_arch = "wasm32", feature = "client"))]
|
// #[cfg(all(target_arch = "wasm32", feature = "client"))]
|
||||||
pub use wasm::*;
|
pub use wasm::*;
|
||||||
|
@ -25,6 +25,7 @@ use crate::types::{
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{self, Duration, Instant};
|
use std::time::{self, Duration, Instant};
|
||||||
|
|
||||||
|
@ -169,7 +170,7 @@ where
|
||||||
fn get_websocket_send(&self) -> Arc<Mutex<SplitSink<S, T>>>;
|
fn get_websocket_send(&self) -> Arc<Mutex<SplitSink<S, T>>>;
|
||||||
fn get_store(&self) -> GatewayStore;
|
fn get_store(&self) -> GatewayStore;
|
||||||
fn get_url(&self) -> String;
|
fn get_url(&self) -> String;
|
||||||
fn get_heartbeat_handler(&self) -> &HeartbeatHandler;
|
fn get_heartbeat_handler(&self) -> &HeartbeatHandler<T, S>;
|
||||||
/// Returns a Result with a matching impl of [`GatewayHandleCapable`], or a [`GatewayError`]
|
/// Returns a Result with a matching impl of [`GatewayHandleCapable`], or a [`GatewayError`]
|
||||||
///
|
///
|
||||||
/// DOCUMENTME: Explain what this method has to do to be a good get_handle() impl, or link to such documentation
|
/// DOCUMENTME: Explain what this method has to do to be a good get_handle() impl, or link to such documentation
|
||||||
|
@ -379,7 +380,7 @@ where
|
||||||
op_code: Some(GATEWAY_HEARTBEAT),
|
op_code: Some(GATEWAY_HEARTBEAT),
|
||||||
};
|
};
|
||||||
|
|
||||||
let heartbeat_thread_communicator = self.get_heartbeat_handler().get_send();
|
let heartbeat_thread_communicator = self.get_heartbeat_handler().send;
|
||||||
|
|
||||||
heartbeat_thread_communicator
|
heartbeat_thread_communicator
|
||||||
.send(heartbeat_communication)
|
.send(heartbeat_communication)
|
||||||
|
@ -408,7 +409,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
let heartbeat_handler = self.get_heartbeat_handler();
|
let heartbeat_handler = self.get_heartbeat_handler();
|
||||||
let heartbeat_thread_communicator = heartbeat_handler.get_send();
|
let heartbeat_thread_communicator = heartbeat_handler.send;
|
||||||
|
|
||||||
heartbeat_thread_communicator
|
heartbeat_thread_communicator
|
||||||
.send(heartbeat_communication)
|
.send(heartbeat_communication)
|
||||||
|
@ -441,7 +442,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
let heartbeat_handler = self.get_heartbeat_handler();
|
let heartbeat_handler = self.get_heartbeat_handler();
|
||||||
let heartbeat_thread_communicator = heartbeat_handler.get_send();
|
let heartbeat_thread_communicator = heartbeat_handler.send;
|
||||||
heartbeat_thread_communicator
|
heartbeat_thread_communicator
|
||||||
.send(heartbeat_communication)
|
.send(heartbeat_communication)
|
||||||
.await
|
.await
|
||||||
|
@ -559,19 +560,19 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles sending heartbeats to the gateway in another thread
|
/// Handles sending heartbeats to the gateway in another thread
|
||||||
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct HeartbeatHandler {
|
pub struct HeartbeatHandler<T: MessageCapable + Send + 'static, S: Sink<T>> {
|
||||||
/// How ofter heartbeats need to be sent at a minimum
|
/// How ofter heartbeats need to be sent at a minimum
|
||||||
pub heartbeat_interval: Duration,
|
pub heartbeat_interval: Duration,
|
||||||
/// The send channel for the heartbeat thread
|
/// The send channel for the heartbeat thread
|
||||||
pub send: Sender<HeartbeatThreadCommunication>,
|
pub send: Sender<HeartbeatThreadCommunication>,
|
||||||
/// The handle of the thread
|
/// The handle of the thread
|
||||||
handle: JoinHandle<()>,
|
handle: JoinHandle<()>,
|
||||||
|
hb_type: (PhantomData<T>, PhantomData<S>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HeartbeatHandler {
|
impl<T: MessageCapable + Send + 'static, S: Sink<T> + Send> HeartbeatHandler<T, S> {
|
||||||
pub async fn heartbeat_task<T: MessageCapable + Send + 'static, S: Sink<T> + Send>(
|
pub async fn heartbeat_task(
|
||||||
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
||||||
heartbeat_interval: Duration,
|
heartbeat_interval: Duration,
|
||||||
mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>,
|
mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>,
|
||||||
|
@ -651,17 +652,30 @@ impl HeartbeatHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
#[async_trait]
|
|
||||||
// TODO: Make me not a trait!!
|
|
||||||
pub trait HeartbeatHandlerCapable<T: MessageCapable + Send + 'static, S: Sink<T>> {
|
|
||||||
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication>;
|
|
||||||
fn get_heartbeat_interval(&self) -> Duration;
|
|
||||||
#[allow(clippy::new_ret_no_self)]
|
|
||||||
// TODO: new() has duplicated code in wasm and default impl. Can be fixed, if this is not a trait
|
|
||||||
fn new(
|
fn new(
|
||||||
heartbeat_interval: Duration,
|
heartbeat_interval: Duration,
|
||||||
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
||||||
kill_rc: tokio::sync::broadcast::Receiver<()>,
|
kill_rc: tokio::sync::broadcast::Receiver<()>,
|
||||||
) -> HeartbeatHandler;
|
) -> HeartbeatHandler<T, S> {
|
||||||
|
let (send, receive) = tokio::sync::mpsc::channel(32);
|
||||||
|
let kill_receive = kill_rc.resubscribe();
|
||||||
|
|
||||||
|
let handle: JoinHandle<()> = task::spawn(async move {
|
||||||
|
HeartbeatHandler::heartbeat_task(
|
||||||
|
websocket_tx,
|
||||||
|
heartbeat_interval,
|
||||||
|
receive,
|
||||||
|
kill_receive,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
Self {
|
||||||
|
heartbeat_interval,
|
||||||
|
send,
|
||||||
|
handle,
|
||||||
|
hb_type: (PhantomData::<T>, PhantomData::<S>),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ use crate::types::{self, GatewayReceivePayload};
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct WasmGateway {
|
pub struct WasmGateway {
|
||||||
events: Arc<Mutex<Events>>,
|
events: Arc<Mutex<Events>>,
|
||||||
heartbeat_handler: HeartbeatHandler,
|
heartbeat_handler: HeartbeatHandler<WsMessage, WsStream>,
|
||||||
websocket_send: Arc<Mutex<SplitSink<WsStream, WsMessage>>>,
|
websocket_send: Arc<Mutex<SplitSink<WsStream, WsMessage>>>,
|
||||||
websocket_receive: SplitStream<WsStream>,
|
websocket_receive: SplitStream<WsStream>,
|
||||||
kill_send: tokio::sync::broadcast::Sender<()>,
|
kill_send: tokio::sync::broadcast::Sender<()>,
|
||||||
|
@ -110,7 +110,7 @@ impl GatewayCapable<WsMessage, WsStream> for WasmGateway {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_heartbeat_handler(&self) -> &HeartbeatHandler {
|
fn get_heartbeat_handler(&self) -> &HeartbeatHandler<WsMessage, WsStream> {
|
||||||
&self.heartbeat_handler
|
&self.heartbeat_handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
use tokio::task::{self, JoinHandle};
|
|
||||||
use ws_stream_wasm::*;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl HeartbeatHandlerCapable<WsMessage, WsStream> for HeartbeatHandler {
|
|
||||||
fn get_send(&self) -> &Sender<HeartbeatThreadCommunication> {
|
|
||||||
&self.send
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_heartbeat_interval(&self) -> Duration {
|
|
||||||
self.heartbeat_interval
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new(
|
|
||||||
heartbeat_interval: Duration,
|
|
||||||
websocket_tx: Arc<Mutex<SplitSink<WsStream, WsMessage>>>,
|
|
||||||
kill_rc: tokio::sync::broadcast::Receiver<()>,
|
|
||||||
) -> HeartbeatHandler {
|
|
||||||
let (send, receive) = tokio::sync::mpsc::channel(32);
|
|
||||||
let kill_receive = kill_rc.resubscribe();
|
|
||||||
|
|
||||||
let handle: JoinHandle<()> = task::spawn(async move {
|
|
||||||
HeartbeatHandler::heartbeat_task(
|
|
||||||
websocket_tx,
|
|
||||||
heartbeat_interval,
|
|
||||||
receive,
|
|
||||||
kill_receive,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
});
|
|
||||||
|
|
||||||
Self {
|
|
||||||
heartbeat_interval,
|
|
||||||
send,
|
|
||||||
handle,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,10 +1,8 @@
|
||||||
pub mod gateway;
|
pub mod gateway;
|
||||||
pub mod handle;
|
pub mod handle;
|
||||||
pub mod heartbeat;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
pub use gateway::*;
|
pub use gateway::*;
|
||||||
pub use handle::*;
|
pub use handle::*;
|
||||||
pub use heartbeat::*;
|
|
||||||
use ws_stream_wasm::WsMessage;
|
use ws_stream_wasm::WsMessage;
|
||||||
|
|
||||||
impl crate::gateway::MessageCapable for WsMessage {
|
impl crate::gateway::MessageCapable for WsMessage {
|
||||||
|
|
Loading…
Reference in New Issue