Impl spawn() for wasm gateway

This commit is contained in:
bitfl0wer 2023-11-19 12:51:07 +01:00
parent 9d88f50bf7
commit c87ad0ea6f
No known key found for this signature in database
GPG Key ID: 0ACD574FCF5226CF
1 changed files with 66 additions and 9 deletions

View File

@ -1,11 +1,14 @@
use std::sync::Arc;
use std::u8;
use super::events::Events;
use super::*;
use pharos::*;
use futures_util::StreamExt;
use tokio::task;
use tokio_stream::StreamExt;
use ws_stream_wasm::*;
use crate::types::{self, WebSocketEvent};
use crate::types::{self, GatewayReceivePayload, WebSocketEvent};
#[derive(Debug)]
pub struct WasmGateway {
events: Arc<Mutex<Events>>,
@ -37,19 +40,73 @@ impl GatewayCapable<WsMessage, WsStream> for WasmGateway {
async fn spawn<G: GatewayHandleCapable<WsMessage, WsStream>>(
websocket_url: String,
) -> Result<G, GatewayError> {
let (mut websocket_stream, _) = match WsMeta::connect(websocket_url, None).await {
let (_, mut websocket_stream) = match WsMeta::connect(websocket_url.clone(), None).await {
Ok(ws) => Ok(ws),
Err(e) => Err(GatewayError::CannotConnect {
error: e.to_string(),
}),
}?;
let mut event = match websocket_stream
.observe(ObserveConfig::channel(self, Channel::Unbounded))
.await
{
Ok(ok) => Ok(ok),
Err(e) => Err(GatewayError::CannotConnect { error: e }),
let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16);
let (websocket_send, mut websocket_receive) = websocket_stream.split();
let shared_websocket_send = Arc::new(Mutex::new(websocket_send));
let msg = match websocket_receive.next().await {
Some(msg) => match msg {
WsMessage::Text(text) => Ok(text),
WsMessage::Binary(vec) => Err(GatewayError::NonHelloOnInitiate {
opcode: vec.into_iter().next().unwrap_or(u8::MIN),
}),
},
None => Err(GatewayError::CannotConnect {
error: "No 'Hello' message received!".to_string(),
}),
}?;
let payload: GatewayReceivePayload = match serde_json::from_str(msg.as_str()) {
Ok(msg) => Ok(msg),
Err(_) => Err(GatewayError::Decode),
}?;
if payload.op_code != GATEWAY_HELLO {
return Err(GatewayError::NonHelloOnInitiate {
opcode: payload.op_code,
});
};
info!("GW: Received Hello");
let gateway_hello: types::HelloData =
serde_json::from_str(payload.event_data.unwrap().get()).unwrap();
let events = Events::default();
let shared_events: Arc<Mutex<Events>> = Arc::new(Mutex::new(events));
let store: GatewayStore = Arc::new(Mutex::new(HashMap::new()));
let mut gateway = WasmGateway {
events: shared_events.clone(),
heartbeat_handler: todo!(),
websocket_send: shared_websocket_send.clone(),
websocket_receive,
store: store.clone(),
url: websocket_url.clone(),
};
task::spawn_local(async move {
gateway.gateway_listen_task().await;
});
Ok(G::new(
websocket_url.clone(),
shared_events.clone(),
shared_websocket_send.clone(),
kill_send.clone(),
store,
))
}
}
impl WasmGateway {
async fn gateway_listen_task(&mut self) {
todo!()
}
}