From 8e6daeff84325aa5a9a66881e47796061f3f30c9 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 30 Apr 2023 11:56:14 +0200 Subject: [PATCH 1/4] Add TLS support to WS connection --- src/gateway.rs | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 61f51ec..6df7686 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,26 +1,25 @@ -use std::sync::Arc; -use std::sync::Mutex; -use std::thread::JoinHandle; -use std::time::Duration; +use std::sync::{mpsc, Arc}; +use std::thread; use crate::api::types::*; use crate::api::WebSocketEvent; use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::URLBundle; -use futures_util::SinkExt; + +use futures_util::stream::{FilterMap, SplitSink, SplitStream}; use futures_util::StreamExt; +use native_tls::TlsConnector; use reqwest::Url; use serde::Deserialize; use serde::Serialize; -use serde_json::from_str; -use serde_json::to_string; use tokio::io; use tokio::net::TcpStream; -use tokio_tungstenite::connect_async; +use tokio::sync::Mutex; use tokio_tungstenite::tungstenite::error::UrlError; -use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; +use tokio_tungstenite::{Connector, MaybeTlsStream}; /** Represents a Gateway connection. A Gateway connection will create observable @@ -31,13 +30,6 @@ pub struct Gateway<'a> { pub url: String, pub token: String, pub events: Events<'a>, - socket: WebSocketStream>, -} - -#[derive(Deserialize, Serialize, Debug)] -enum GatewayIntervalEvent { - Heartbeat(Option), - Hello(u64), } impl<'a> Gateway<'a> { @@ -51,7 +43,15 @@ impl<'a> Gateway<'a> { UrlError::UnsupportedUrlScheme, )); } - let (ws_stream, _) = match connect_async(websocket_url.clone()).await { + let (ws_stream, _) = match connect_async_tls_with_config( + websocket_url.clone(), + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { Ok(ws_stream) => ws_stream, Err(_) => { return Err(tokio_tungstenite::tungstenite::Error::Io( @@ -60,11 +60,12 @@ impl<'a> Gateway<'a> { } }; + let (mut write, read) = ws_stream.split(); + return Ok(Gateway { url: websocket_url, token, events: Events::default(), - socket: ws_stream, }); } } From 283e3fd9ac1c54c6129331f85d8e8f394bc79cb4 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 30 Apr 2023 12:17:35 +0200 Subject: [PATCH 2/4] Get GatewayHello from Stream --- src/gateway.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 6df7686..571ac5a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -13,6 +13,7 @@ use native_tls::TlsConnector; use reqwest::Url; use serde::Deserialize; use serde::Serialize; +use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; use tokio::sync::Mutex; @@ -43,7 +44,7 @@ impl<'a> Gateway<'a> { UrlError::UnsupportedUrlScheme, )); } - let (ws_stream, _) = match connect_async_tls_with_config( + let (mut ws_stream, _) = match connect_async_tls_with_config( websocket_url.clone(), None, Some(Connector::NativeTls( @@ -59,6 +60,12 @@ impl<'a> Gateway<'a> { )) } }; + let hello_message: GatewayHello = match ws_stream.next().await.unwrap() { + Ok(message) => from_str(message.into_text().unwrap().as_str()).unwrap(), + Err(_) => panic!("AAAAAAA"), + }; + + println!("{}", hello_message.d.heartbeat_interval); let (mut write, read) = ws_stream.split(); From f95212b8032d94a08561c9c80dcaba944e79ba2f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 30 Apr 2023 14:45:15 +0200 Subject: [PATCH 3/4] add example threaded websocketconnection --- src/gateway.rs | 99 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 571ac5a..818d14b 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,4 +1,4 @@ -use std::sync::{mpsc, Arc}; +use std::sync::Arc; use std::thread; use crate::api::types::*; @@ -8,6 +8,7 @@ use crate::gateway::events::Events; use crate::URLBundle; use futures_util::stream::{FilterMap, SplitSink, SplitStream}; +use futures_util::SinkExt; use futures_util::StreamExt; use native_tls::TlsConnector; use reqwest::Url; @@ -16,7 +17,9 @@ use serde::Serialize; use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; +use tokio::task; use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; @@ -38,37 +41,6 @@ impl<'a> Gateway<'a> { websocket_url: String, token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { - let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); - if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(tokio_tungstenite::tungstenite::Error::Url( - UrlError::UnsupportedUrlScheme, - )); - } - let (mut ws_stream, _) = match connect_async_tls_with_config( - websocket_url.clone(), - None, - Some(Connector::NativeTls( - TlsConnector::builder().build().unwrap(), - )), - ) - .await - { - Ok(ws_stream) => ws_stream, - Err(_) => { - return Err(tokio_tungstenite::tungstenite::Error::Io( - io::ErrorKind::ConnectionAborted.into(), - )) - } - }; - let hello_message: GatewayHello = match ws_stream.next().await.unwrap() { - Ok(message) => from_str(message.into_text().unwrap().as_str()).unwrap(), - Err(_) => panic!("AAAAAAA"), - }; - - println!("{}", hello_message.d.heartbeat_interval); - - let (mut write, read) = ws_stream.split(); - return Ok(Gateway { url: websocket_url, token, @@ -77,6 +49,69 @@ impl<'a> Gateway<'a> { } } +struct WebSocketConnection { + rx: Arc>>, + tx: Arc>>, +} + +impl<'a> WebSocketConnection { + async fn new(websocket_url: String) -> WebSocketConnection { + let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); + /*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { + return Err(tokio_tungstenite::tungstenite::Error::Url( + UrlError::UnsupportedUrlScheme, + )); + }*/ + + let (mut channel_write, mut channel_read): ( + Sender, + Receiver, + ) = channel(32); + + let shared_channel_write = Arc::new(Mutex::new(channel_write)); + let clone_shared_channel_write = shared_channel_write.clone(); + let shared_channel_read = Arc::new(Mutex::new(channel_read)); + let clone_shared_channel_read = shared_channel_read.clone(); + + task::spawn(async move { + let (mut ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io( + io::ErrorKind::ConnectionAborted.into(), + ))*/ + }; + + let (mut write_tx, mut write_rx) = ws_stream.split(); + + while let Some(msg) = shared_channel_read.lock().await.recv().await { + write_tx.send(msg).await.unwrap(); + } + + let event = while let Some(msg) = write_rx.next().await { + shared_channel_write + .lock() + .await + .send(msg.unwrap()) + .await + .unwrap(); + }; + }); + + WebSocketConnection { + tx: clone_shared_channel_write, + rx: clone_shared_channel_read, + } + } +} + /** Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to an Observable. The Observer is notified when the Observable's data changes. From a8e3ad0950eb286ef80e5cf6117a0776536d7c24 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 30 Apr 2023 21:54:15 +0200 Subject: [PATCH 4/4] add GatewayPayload --- src/api/types.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/api/types.rs b/src/api/types.rs index 4bdb6c4..2efbc1a 100644 --- a/src/api/types.rs +++ b/src/api/types.rs @@ -829,3 +829,15 @@ impl WebSocketEvent for GatewayHeartbeat {} pub struct GatewayHeartbeatAck { pub op: i32, } + +impl WebSocketEvent for GatewayHeartbeatAck {} + +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct GatewayPayload { + pub op: i32, + pub d: Option, + pub s: Option, + pub t: Option, +} + +impl WebSocketEvent for GatewayPayload {}