Merge pull request #21 from polyphony-chat/feature/gateway-observer

Feature/gateway observer
This commit is contained in:
Flori 2023-04-30 11:56:22 +02:00 committed by GitHub
commit f9fd84a58c
3 changed files with 75 additions and 19 deletions

View File

@ -15,3 +15,4 @@ regex = "1.7.3"
custom_error = "1.9.2"
native-tls = "0.2.11"
tokio-tungstenite = {version = "0.18.0", features = ["native-tls"]}
futures-util = "0.3.28"

View File

@ -801,3 +801,31 @@ pub struct GatewayResume {
}
impl WebSocketEvent for GatewayResume {}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct GatewayHello {
pub op: i32,
pub d: HelloData,
}
impl WebSocketEvent for GatewayHello {}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct HelloData {
pub heartbeat_interval: i32,
}
impl WebSocketEvent for HelloData {}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct GatewayHeartbeat {
pub op: u8,
pub d: u64,
}
impl WebSocketEvent for GatewayHeartbeat {}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct GatewayHeartbeatAck {
pub op: i32,
}

View File

@ -1,16 +1,24 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::api::types::*;
use crate::api::WebSocketEvent;
use crate::errors::ObserverError;
use crate::gateway::events::Events;
use crate::URLBundle;
use futures_util::SinkExt;
use futures_util::StreamExt;
use reqwest::Url;
use serde::Deserialize;
use serde::Serialize;
use serde_json::from_str;
use serde_json::to_string;
use tokio::io;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::error::UrlError;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
@ -21,31 +29,43 @@ implemented [Types] with the trait [`WebSocketEvent`]
*/
pub struct Gateway<'a> {
pub url: String,
pub token: String,
pub events: Events<'a>,
socket: Arc<Mutex<Option<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
thread_handle: Option<JoinHandle<()>>,
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
#[derive(Deserialize, Serialize, Debug)]
enum GatewayIntervalEvent {
Heartbeat(Option<u64>),
Hello(u64),
}
impl<'a> Gateway<'a> {
pub async fn new(websocket_url: String, token: String) {
pub async fn new(
websocket_url: String,
token: String,
) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> {
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
//return Err(Error::Url(UrlError::UnsupportedUrlScheme));
return Err(tokio_tungstenite::tungstenite::Error::Url(
UrlError::UnsupportedUrlScheme,
));
}
let (ws_stream, _) = match connect_async(websocket_url.clone()).await {
Ok(ws_stream) => ws_stream,
Err(_) => {
return Err(tokio_tungstenite::tungstenite::Error::Io(
io::ErrorKind::ConnectionAborted.into(),
))
}
let payload = GatewayIdentifyPayload {
token: token,
properties: GatewayIdentifyConnectionProps {
os: "any".to_string(),
browser: "chorus-polyphony".to_string(),
device: "chorus-lib".to_string(),
},
compress: Some(true),
large_threshold: None,
shard: None,
presence: None,
intents: 3276799,
};
let payload_string = to_string(&payload).unwrap();
return Ok(Gateway {
url: websocket_url,
token,
events: Events::default(),
socket: ws_stream,
});
}
}
@ -200,4 +220,11 @@ mod example {
Some(err) => println!("You cannot subscribe twice: {}", err),
}
}
#[tokio::test]
async fn test_gateway() {
let gateway = Gateway::new("ws://localhost:3001/".to_string(), "none".to_string())
.await
.unwrap();
}
}