From 9ae07a15d70c790a962acbec87e803a522e2e043 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 19 Nov 2023 01:31:04 +0100 Subject: [PATCH] start implementing wasm gateway --- Cargo.lock | 1 + Cargo.toml | 2 ++ src/gateway/wasm/gateway.rs | 39 +++++++++++++++++++++++++++++++++++++ src/gateway/wasm/handle.rs | 34 ++++++++++++++++++++++++++++++++ src/gateway/wasm/mod.rs | 6 ++++++ 5 files changed, 82 insertions(+) create mode 100644 src/gateway/wasm/handle.rs diff --git a/Cargo.lock b/Cargo.lock index ba6758d..f7379c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,7 @@ dependencies = [ "lazy_static", "log", "native-tls", + "pharos", "poem", "rand", "regex", diff --git a/Cargo.toml b/Cargo.toml index 433ca85..0907694 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,8 @@ hostname = "0.3.1" getrandom = { version = "0.2.11", features = ["js"] } tokio-tungstenite = { version = "0.20.1", default-features = false } ws_stream_wasm = "0.7.4" +pharos = "0.5.3" + [dev-dependencies] lazy_static = "1.4.0" diff --git a/src/gateway/wasm/gateway.rs b/src/gateway/wasm/gateway.rs index 8b30e7d..f3bd3d7 100644 --- a/src/gateway/wasm/gateway.rs +++ b/src/gateway/wasm/gateway.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use super::events::Events; use super::*; +use pharos::*; use ws_stream_wasm::*; use crate::types::{self, WebSocketEvent}; @@ -14,3 +15,41 @@ pub struct WasmGateway { store: GatewayStore, url: String, } + +#[async_trait] +impl GatewayCapable for WasmGateway { + fn get_events(&self) -> Arc> { + self.events.clone() + } + + fn get_websocket_send(&self) -> Arc>> { + self.websocket_send.clone() + } + + fn get_store(&self) -> GatewayStore { + self.store.clone() + } + + fn get_url(&self) -> String { + self.url.clone() + } + + async fn spawn>( + websocket_url: String, + ) -> Result { + let (mut websocket_stream, _) = match WsMeta::connect(websocket_url, None).await { + Ok(ws) => Ok(ws), + Err(e) => Err(GatewayError::CannotConnect { + error: e.to_string(), + }), + }?; + + let mut event = match websocket_stream + .observe(ObserveConfig::channel(self, Channel::Unbounded)) + .await + { + Ok(ok) => Ok(ok), + Err(e) => Err(GatewayError::CannotConnect { error: e }), + }?; + } +} diff --git a/src/gateway/wasm/handle.rs b/src/gateway/wasm/handle.rs new file mode 100644 index 0000000..c9ad14d --- /dev/null +++ b/src/gateway/wasm/handle.rs @@ -0,0 +1,34 @@ +use super::*; +use std::sync::Arc; + +use crate::gateway::GatewayHandleCapable; +use ws_stream_wasm::*; + +#[derive(Debug, Clone)] +pub struct WasmGatewayHandle { + pub url: String, + pub events: Arc>, + pub websocket_send: Arc>>, + /// Tells gateway tasks to close + pub(super) kill_send: tokio::sync::broadcast::Sender<()>, + pub(crate) store: GatewayStore, +} + +#[async_trait] +impl GatewayHandleCapable for WasmGatewayHandle { + fn new( + url: String, + events: Arc>, + websocket_send: Arc>>, + kill_send: tokio::sync::broadcast::Sender<()>, + store: GatewayStore, + ) -> Self { + Self { + url, + events, + websocket_send, + kill_send, + store, + } + } +} diff --git a/src/gateway/wasm/mod.rs b/src/gateway/wasm/mod.rs index aaab48f..6cb87f2 100644 --- a/src/gateway/wasm/mod.rs +++ b/src/gateway/wasm/mod.rs @@ -1,7 +1,9 @@ pub mod gateway; +pub mod handle; pub mod heartbeat; use super::*; pub use gateway::*; +pub use handle::*; pub use heartbeat::*; use ws_stream_wasm::WsMessage; @@ -27,4 +29,8 @@ impl crate::gateway::MessageCapable for WsMessage { _ => false, } } + + fn from_str(s: &str) -> Self { + WsMessage::Text(s.to_string()) + } }