diff --git a/src/gateway/wasm/gateway.rs b/src/gateway/wasm/gateway.rs index bb4573c..cdbb991 100644 --- a/src/gateway/wasm/gateway.rs +++ b/src/gateway/wasm/gateway.rs @@ -3,18 +3,19 @@ use std::u8; use super::events::Events; use super::*; +use futures_util::stream::SplitStream; use futures_util::StreamExt; use tokio::task; -use tokio_stream::StreamExt; use ws_stream_wasm::*; -use crate::types::{self, GatewayReceivePayload, WebSocketEvent}; +use crate::types::{self, GatewayReceivePayload}; #[derive(Debug)] pub struct WasmGateway { events: Arc>, heartbeat_handler: HeartbeatHandler, websocket_send: Arc>>, websocket_receive: SplitStream, + kill_send: tokio::sync::broadcast::Sender<()>, store: GatewayStore, url: String, } @@ -84,10 +85,15 @@ impl GatewayCapable for WasmGateway { let mut gateway = WasmGateway { events: shared_events.clone(), - heartbeat_handler: todo!(), + heartbeat_handler: HeartbeatHandler::new( + Duration::from_millis(gateway_hello.heartbeat_interval), + shared_websocket_send.clone(), + kill_send.subscribe(), + ), websocket_send: shared_websocket_send.clone(), websocket_receive, store: store.clone(), + kill_send: kill_send.clone(), url: websocket_url.clone(), }; @@ -103,6 +109,15 @@ impl GatewayCapable for WasmGateway { store, )) } + + fn get_heartbeat_handler(&self) -> &HeartbeatHandler { + &self.heartbeat_handler + } + + async fn close(&mut self) { + self.kill_send.send(()).unwrap(); + self.websocket_send.lock().await.close().await.unwrap(); + } } impl WasmGateway {