diff --git a/src/gateway/wasm/gateway.rs b/src/gateway/wasm/gateway.rs index f3bd3d7..bb4573c 100644 --- a/src/gateway/wasm/gateway.rs +++ b/src/gateway/wasm/gateway.rs @@ -1,11 +1,14 @@ use std::sync::Arc; +use std::u8; use super::events::Events; use super::*; -use pharos::*; +use futures_util::StreamExt; +use tokio::task; +use tokio_stream::StreamExt; use ws_stream_wasm::*; -use crate::types::{self, WebSocketEvent}; +use crate::types::{self, GatewayReceivePayload, WebSocketEvent}; #[derive(Debug)] pub struct WasmGateway { events: Arc>, @@ -37,19 +40,73 @@ impl GatewayCapable for WasmGateway { async fn spawn>( websocket_url: String, ) -> Result { - let (mut websocket_stream, _) = match WsMeta::connect(websocket_url, None).await { + let (_, mut websocket_stream) = match WsMeta::connect(websocket_url.clone(), None).await { Ok(ws) => Ok(ws), Err(e) => Err(GatewayError::CannotConnect { error: e.to_string(), }), }?; - let mut event = match websocket_stream - .observe(ObserveConfig::channel(self, Channel::Unbounded)) - .await - { - Ok(ok) => Ok(ok), - Err(e) => Err(GatewayError::CannotConnect { error: e }), + let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16); + let (websocket_send, mut websocket_receive) = websocket_stream.split(); + let shared_websocket_send = Arc::new(Mutex::new(websocket_send)); + + let msg = match websocket_receive.next().await { + Some(msg) => match msg { + WsMessage::Text(text) => Ok(text), + WsMessage::Binary(vec) => Err(GatewayError::NonHelloOnInitiate { + opcode: vec.into_iter().next().unwrap_or(u8::MIN), + }), + }, + None => Err(GatewayError::CannotConnect { + error: "No 'Hello' message received!".to_string(), + }), }?; + + let payload: GatewayReceivePayload = match serde_json::from_str(msg.as_str()) { + Ok(msg) => Ok(msg), + Err(_) => Err(GatewayError::Decode), + }?; + if payload.op_code != GATEWAY_HELLO { + return Err(GatewayError::NonHelloOnInitiate { + opcode: payload.op_code, + }); + }; + + info!("GW: Received Hello"); + + let gateway_hello: types::HelloData = + serde_json::from_str(payload.event_data.unwrap().get()).unwrap(); + + let events = Events::default(); + let shared_events: Arc> = Arc::new(Mutex::new(events)); + let store: GatewayStore = Arc::new(Mutex::new(HashMap::new())); + + let mut gateway = WasmGateway { + events: shared_events.clone(), + heartbeat_handler: todo!(), + websocket_send: shared_websocket_send.clone(), + websocket_receive, + store: store.clone(), + url: websocket_url.clone(), + }; + + task::spawn_local(async move { + gateway.gateway_listen_task().await; + }); + + Ok(G::new( + websocket_url.clone(), + shared_events.clone(), + shared_websocket_send.clone(), + kill_send.clone(), + store, + )) + } +} + +impl WasmGateway { + async fn gateway_listen_task(&mut self) { + todo!() } }