From f95212b8032d94a08561c9c80dcaba944e79ba2f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 30 Apr 2023 14:45:15 +0200 Subject: [PATCH] add example threaded websocketconnection --- src/gateway.rs | 99 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 32 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 571ac5a..818d14b 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,4 +1,4 @@ -use std::sync::{mpsc, Arc}; +use std::sync::Arc; use std::thread; use crate::api::types::*; @@ -8,6 +8,7 @@ use crate::gateway::events::Events; use crate::URLBundle; use futures_util::stream::{FilterMap, SplitSink, SplitStream}; +use futures_util::SinkExt; use futures_util::StreamExt; use native_tls::TlsConnector; use reqwest::Url; @@ -16,7 +17,9 @@ use serde::Serialize; use serde_json::from_str; use tokio::io; use tokio::net::TcpStream; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; +use tokio::task; use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; @@ -38,37 +41,6 @@ impl<'a> Gateway<'a> { websocket_url: String, token: String, ) -> Result, tokio_tungstenite::tungstenite::Error> { - let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); - if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { - return Err(tokio_tungstenite::tungstenite::Error::Url( - UrlError::UnsupportedUrlScheme, - )); - } - let (mut ws_stream, _) = match connect_async_tls_with_config( - websocket_url.clone(), - None, - Some(Connector::NativeTls( - TlsConnector::builder().build().unwrap(), - )), - ) - .await - { - Ok(ws_stream) => ws_stream, - Err(_) => { - return Err(tokio_tungstenite::tungstenite::Error::Io( - io::ErrorKind::ConnectionAborted.into(), - )) - } - }; - let hello_message: GatewayHello = match ws_stream.next().await.unwrap() { - Ok(message) => from_str(message.into_text().unwrap().as_str()).unwrap(), - Err(_) => panic!("AAAAAAA"), - }; - - println!("{}", hello_message.d.heartbeat_interval); - - let (mut write, read) = ws_stream.split(); - return Ok(Gateway { url: websocket_url, token, @@ -77,6 +49,69 @@ impl<'a> Gateway<'a> { } } +struct WebSocketConnection { + rx: Arc>>, + tx: Arc>>, +} + +impl<'a> WebSocketConnection { + async fn new(websocket_url: String) -> WebSocketConnection { + let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap(); + /*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" { + return Err(tokio_tungstenite::tungstenite::Error::Url( + UrlError::UnsupportedUrlScheme, + )); + }*/ + + let (mut channel_write, mut channel_read): ( + Sender, + Receiver, + ) = channel(32); + + let shared_channel_write = Arc::new(Mutex::new(channel_write)); + let clone_shared_channel_write = shared_channel_write.clone(); + let shared_channel_read = Arc::new(Mutex::new(channel_read)); + let clone_shared_channel_read = shared_channel_read.clone(); + + task::spawn(async move { + let (mut ws_stream, _) = match connect_async_tls_with_config( + &websocket_url, + None, + Some(Connector::NativeTls( + TlsConnector::builder().build().unwrap(), + )), + ) + .await + { + Ok(ws_stream) => ws_stream, + Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io( + io::ErrorKind::ConnectionAborted.into(), + ))*/ + }; + + let (mut write_tx, mut write_rx) = ws_stream.split(); + + while let Some(msg) = shared_channel_read.lock().await.recv().await { + write_tx.send(msg).await.unwrap(); + } + + let event = while let Some(msg) = write_rx.next().await { + shared_channel_write + .lock() + .await + .send(msg.unwrap()) + .await + .unwrap(); + }; + }); + + WebSocketConnection { + tx: clone_shared_channel_write, + rx: clone_shared_channel_read, + } + } +} + /** Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to an Observable. The Observer is notified when the Observable's data changes.