diff --git a/src/api/types.rs b/src/api/types.rs index 4bdb6c4..2efbc1a 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -829,3 +829,15 @@ impl WebSocketEvent for GatewayHeartbeat {} pub struct GatewayHeartbeatAck { pub op: i32, } + +impl WebSocketEvent for GatewayHeartbeatAck {} + +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct GatewayPayload { + pub op: i32, + pub d: Option, + pub s: Option, + pub t: Option, +} + +impl WebSocketEvent for GatewayPayload {} diff --git a/src/gateway.rs b/src/gateway.rs index 61f51ec..818d14b 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,26 +1,29 @@ use std::sync::Arc; -use std::sync::Mutex; -use std::thread::JoinHandle; -use std::time::Duration; +use std::thread; use crate::api::types::*; use crate::api::WebSocketEvent; use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::URLBundle; + +use futures_util::stream::{FilterMap, SplitSink, SplitStream}; use futures_util::SinkExt; use futures_util::StreamExt; +use native_tls::TlsConnector; use reqwest::Url; use serde::Deserialize; use serde::Serialize; use serde_json::from_str; -use serde_json::to_string; use tokio::io; use tokio::net::TcpStream; -use tokio_tungstenite::connect_async; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::Mutex; +use tokio::task; use tokio_tungstenite::tungstenite::error::UrlError; -use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; +use tokio_tungstenite::{Connector, MaybeTlsStream}; /** Represents a Gateway connection. A Gateway connection will create observable @@ -31,13 +34,6 @@ pub struct Gateway<'a> { pub url: String, pub token: String, pub events: Events<'a>, - socket: WebSocketStream>, -} - -#[derive(Deserialize, Serialize, Debug)] -enum GatewayIntervalEvent { - Heartbeat(Option), - Hello(u64), } impl<'a> Gateway<'a> { @@ -45,30 +41,77 @@ impl<'a> Gateway<'a> { websocket_url: String, token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { - let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); - if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(tokio_tungstenite::tungstenite::Error::Url( - UrlError::UnsupportedUrlScheme, - )); - } - let (ws_stream, _) = match connect_async(websocket_url.clone()).await { - Ok(ws_stream) => ws_stream, - Err(_) => { - return Err(tokio_tungstenite::tungstenite::Error::Io( - io::ErrorKind::ConnectionAborted.into(), - )) - } - }; - return Ok(Gateway { url: websocket_url, token, events: Events::default(), - socket: ws_stream, }); } } +struct WebSocketConnection { + rx: Arc>>, + tx: Arc>>, +} + +impl<'a> WebSocketConnection { + async fn new(websocket_url: String) -> WebSocketConnection { + let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); + /*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { + return Err(tokio_tungstenite::tungstenite::Error::Url( + UrlError::UnsupportedUrlScheme, + )); + }*/ + + let (mut channel_write, mut channel_read): ( + Sender, + Receiver, + ) = channel(32); + + let shared_channel_write = Arc::new(Mutex::new(channel_write)); + let clone_shared_channel_write = shared_channel_write.clone(); + let shared_channel_read = Arc::new(Mutex::new(channel_read)); + let clone_shared_channel_read = shared_channel_read.clone(); + + task::spawn(async move { + let (mut ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io( + io::ErrorKind::ConnectionAborted.into(), + ))*/ + }; + + let (mut write_tx, mut write_rx) = ws_stream.split(); + + while let Some(msg) = shared_channel_read.lock().await.recv().await { + write_tx.send(msg).await.unwrap(); + } + + let event = while let Some(msg) = write_rx.next().await { + shared_channel_write + .lock() + .await + .send(msg.unwrap()) + .await + .unwrap(); + }; + }); + + WebSocketConnection { + tx: clone_shared_channel_write, + rx: clone_shared_channel_read, + } + } +} + /** Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to an Observable. The Observer is notified when the Observable's data changes.