Merge pull request #24 from polyphony-chat/feature/gateway-observer

Feature/gateway observer
This commit is contained in:
Flori 2023-05-01 22:58:14 +02:00 committed by GitHub
commit fab6bb2027
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 29 deletions

View File

@ -829,3 +829,15 @@ impl WebSocketEvent for GatewayHeartbeat {}
pub struct GatewayHeartbeatAck { pub struct GatewayHeartbeatAck {
pub op: i32, pub op: i32,
} }
impl WebSocketEvent for GatewayHeartbeatAck {}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct GatewayPayload {
pub op: i32,
pub d: Option<String>,
pub s: Option<i64>,
pub t: Option<String>,
}
impl WebSocketEvent for GatewayPayload {}

View File

@ -1,26 +1,29 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::api::types::*; use crate::api::types::*;
use crate::api::WebSocketEvent; use crate::api::WebSocketEvent;
use crate::errors::ObserverError; use crate::errors::ObserverError;
use crate::gateway::events::Events; use crate::gateway::events::Events;
use crate::URLBundle; use crate::URLBundle;
use futures_util::stream::{FilterMap, SplitSink, SplitStream};
use futures_util::SinkExt; use futures_util::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
use native_tls::TlsConnector;
use reqwest::Url; use reqwest::Url;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use serde_json::from_str; use serde_json::from_str;
use serde_json::to_string;
use tokio::io; use tokio::io;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_tungstenite::connect_async; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task;
use tokio_tungstenite::tungstenite::error::UrlError; use tokio_tungstenite::tungstenite::error::UrlError;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, connect_async_tls_with_config};
use tokio_tungstenite::{Connector, MaybeTlsStream};
/** /**
Represents a Gateway connection. A Gateway connection will create observable Represents a Gateway connection. A Gateway connection will create observable
@ -31,13 +34,6 @@ pub struct Gateway<'a> {
pub url: String, pub url: String,
pub token: String, pub token: String,
pub events: Events<'a>, pub events: Events<'a>,
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
#[derive(Deserialize, Serialize, Debug)]
enum GatewayIntervalEvent {
Heartbeat(Option<u64>),
Hello(u64),
} }
impl<'a> Gateway<'a> { impl<'a> Gateway<'a> {
@ -45,30 +41,77 @@ impl<'a> Gateway<'a> {
websocket_url: String, websocket_url: String,
token: String, token: String,
) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> { ) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> {
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
return Err(tokio_tungstenite::tungstenite::Error::Url(
UrlError::UnsupportedUrlScheme,
));
}
let (ws_stream, _) = match connect_async(websocket_url.clone()).await {
Ok(ws_stream) => ws_stream,
Err(_) => {
return Err(tokio_tungstenite::tungstenite::Error::Io(
io::ErrorKind::ConnectionAborted.into(),
))
}
};
return Ok(Gateway { return Ok(Gateway {
url: websocket_url, url: websocket_url,
token, token,
events: Events::default(), events: Events::default(),
socket: ws_stream,
}); });
} }
} }
struct WebSocketConnection {
rx: Arc<Mutex<Receiver<tokio_tungstenite::tungstenite::Message>>>,
tx: Arc<Mutex<Sender<tokio_tungstenite::tungstenite::Message>>>,
}
impl<'a> WebSocketConnection {
async fn new(websocket_url: String) -> WebSocketConnection {
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
/*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
return Err(tokio_tungstenite::tungstenite::Error::Url(
UrlError::UnsupportedUrlScheme,
));
}*/
let (mut channel_write, mut channel_read): (
Sender<tokio_tungstenite::tungstenite::Message>,
Receiver<tokio_tungstenite::tungstenite::Message>,
) = channel(32);
let shared_channel_write = Arc::new(Mutex::new(channel_write));
let clone_shared_channel_write = shared_channel_write.clone();
let shared_channel_read = Arc::new(Mutex::new(channel_read));
let clone_shared_channel_read = shared_channel_read.clone();
task::spawn(async move {
let (mut ws_stream, _) = match connect_async_tls_with_config(
&websocket_url,
None,
Some(Connector::NativeTls(
TlsConnector::builder().build().unwrap(),
)),
)
.await
{
Ok(ws_stream) => ws_stream,
Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io(
io::ErrorKind::ConnectionAborted.into(),
))*/
};
let (mut write_tx, mut write_rx) = ws_stream.split();
while let Some(msg) = shared_channel_read.lock().await.recv().await {
write_tx.send(msg).await.unwrap();
}
let event = while let Some(msg) = write_rx.next().await {
shared_channel_write
.lock()
.await
.send(msg.unwrap())
.await
.unwrap();
};
});
WebSocketConnection {
tx: clone_shared_channel_write,
rx: clone_shared_channel_read,
}
}
}
/** /**
Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to
an Observable. The Observer is notified when the Observable's data changes. an Observable. The Observer is notified when the Observable's data changes.