add example threaded websocketconnection

This commit is contained in:
bitfl0wer 2023-04-30 14:45:15 +02:00
parent 283e3fd9ac
commit f95212b803
1 changed files with 67 additions and 32 deletions

View File

@ -1,4 +1,4 @@
use std::sync::{mpsc, Arc}; use std::sync::Arc;
use std::thread; use std::thread;
use crate::api::types::*; use crate::api::types::*;
@ -8,6 +8,7 @@ use crate::gateway::events::Events;
use crate::URLBundle; use crate::URLBundle;
use futures_util::stream::{FilterMap, SplitSink, SplitStream}; use futures_util::stream::{FilterMap, SplitSink, SplitStream};
use futures_util::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
use native_tls::TlsConnector; use native_tls::TlsConnector;
use reqwest::Url; use reqwest::Url;
@ -16,7 +17,9 @@ use serde::Serialize;
use serde_json::from_str; use serde_json::from_str;
use tokio::io; use tokio::io;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::task;
use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::tungstenite::error::UrlError;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, connect_async_tls_with_config}; use tokio_tungstenite::{connect_async, connect_async_tls_with_config};
@ -38,37 +41,6 @@ impl<'a> Gateway<'a> {
websocket_url: String, websocket_url: String,
token: String, token: String,
) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> { ) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> {
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
return Err(tokio_tungstenite::tungstenite::Error::Url(
UrlError::UnsupportedUrlScheme,
));
}
let (mut ws_stream, _) = match connect_async_tls_with_config(
websocket_url.clone(),
None,
Some(Connector::NativeTls(
TlsConnector::builder().build().unwrap(),
)),
)
.await
{
Ok(ws_stream) => ws_stream,
Err(_) => {
return Err(tokio_tungstenite::tungstenite::Error::Io(
io::ErrorKind::ConnectionAborted.into(),
))
}
};
let hello_message: GatewayHello = match ws_stream.next().await.unwrap() {
Ok(message) => from_str(message.into_text().unwrap().as_str()).unwrap(),
Err(_) => panic!("AAAAAAA"),
};
println!("{}", hello_message.d.heartbeat_interval);
let (mut write, read) = ws_stream.split();
return Ok(Gateway { return Ok(Gateway {
url: websocket_url, url: websocket_url,
token, token,
@ -77,6 +49,69 @@ impl<'a> Gateway<'a> {
} }
} }
struct WebSocketConnection {
rx: Arc<Mutex<Receiver<tokio_tungstenite::tungstenite::Message>>>,
tx: Arc<Mutex<Sender<tokio_tungstenite::tungstenite::Message>>>,
}
impl<'a> WebSocketConnection {
async fn new(websocket_url: String) -> WebSocketConnection {
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
/*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
return Err(tokio_tungstenite::tungstenite::Error::Url(
UrlError::UnsupportedUrlScheme,
));
}*/
let (mut channel_write, mut channel_read): (
Sender<tokio_tungstenite::tungstenite::Message>,
Receiver<tokio_tungstenite::tungstenite::Message>,
) = channel(32);
let shared_channel_write = Arc::new(Mutex::new(channel_write));
let clone_shared_channel_write = shared_channel_write.clone();
let shared_channel_read = Arc::new(Mutex::new(channel_read));
let clone_shared_channel_read = shared_channel_read.clone();
task::spawn(async move {
let (mut ws_stream, _) = match connect_async_tls_with_config(
&websocket_url,
None,
Some(Connector::NativeTls(
TlsConnector::builder().build().unwrap(),
)),
)
.await
{
Ok(ws_stream) => ws_stream,
Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io(
io::ErrorKind::ConnectionAborted.into(),
))*/
};
let (mut write_tx, mut write_rx) = ws_stream.split();
while let Some(msg) = shared_channel_read.lock().await.recv().await {
write_tx.send(msg).await.unwrap();
}
let event = while let Some(msg) = write_rx.next().await {
shared_channel_write
.lock()
.await
.send(msg.unwrap())
.await
.unwrap();
};
});
WebSocketConnection {
tx: clone_shared_channel_write,
rx: clone_shared_channel_read,
}
}
}
/** /**
Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to
an Observable. The Observer is notified when the Observable's data changes. an Observable. The Observer is notified when the Observable's data changes.