Merge pull request #24 from polyphony-chat/feature/gateway-observer
Feature/gateway observer
This commit is contained in:
commit
05fccce18e
|
@ -829,3 +829,15 @@ impl WebSocketEvent for GatewayHeartbeat {}
|
|||
pub struct GatewayHeartbeatAck {
|
||||
pub op: i32,
|
||||
}
|
||||
|
||||
impl WebSocketEvent for GatewayHeartbeatAck {}
|
||||
|
||||
#[derive(Debug, Default, Deserialize, Serialize)]
|
||||
pub struct GatewayPayload {
|
||||
pub op: i32,
|
||||
pub d: Option<String>,
|
||||
pub s: Option<i64>,
|
||||
pub t: Option<String>,
|
||||
}
|
||||
|
||||
impl WebSocketEvent for GatewayPayload {}
|
||||
|
|
101
src/gateway.rs
101
src/gateway.rs
|
@ -1,26 +1,29 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
use std::thread;
|
||||
|
||||
use crate::api::types::*;
|
||||
use crate::api::WebSocketEvent;
|
||||
use crate::errors::ObserverError;
|
||||
use crate::gateway::events::Events;
|
||||
use crate::URLBundle;
|
||||
|
||||
use futures_util::stream::{FilterMap, SplitSink, SplitStream};
|
||||
use futures_util::SinkExt;
|
||||
use futures_util::StreamExt;
|
||||
use native_tls::TlsConnector;
|
||||
use reqwest::Url;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::from_str;
|
||||
use serde_json::to_string;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::connect_async;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task;
|
||||
use tokio_tungstenite::tungstenite::error::UrlError;
|
||||
use tokio_tungstenite::MaybeTlsStream;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::{connect_async, connect_async_tls_with_config};
|
||||
use tokio_tungstenite::{Connector, MaybeTlsStream};
|
||||
|
||||
/**
|
||||
Represents a Gateway connection. A Gateway connection will create observable
|
||||
|
@ -31,13 +34,6 @@ pub struct Gateway<'a> {
|
|||
pub url: String,
|
||||
pub token: String,
|
||||
pub events: Events<'a>,
|
||||
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
enum GatewayIntervalEvent {
|
||||
Heartbeat(Option<u64>),
|
||||
Hello(u64),
|
||||
}
|
||||
|
||||
impl<'a> Gateway<'a> {
|
||||
|
@ -45,30 +41,77 @@ impl<'a> Gateway<'a> {
|
|||
websocket_url: String,
|
||||
token: String,
|
||||
) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> {
|
||||
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
|
||||
if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
|
||||
return Err(tokio_tungstenite::tungstenite::Error::Url(
|
||||
UrlError::UnsupportedUrlScheme,
|
||||
));
|
||||
}
|
||||
let (ws_stream, _) = match connect_async(websocket_url.clone()).await {
|
||||
Ok(ws_stream) => ws_stream,
|
||||
Err(_) => {
|
||||
return Err(tokio_tungstenite::tungstenite::Error::Io(
|
||||
io::ErrorKind::ConnectionAborted.into(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(Gateway {
|
||||
url: websocket_url,
|
||||
token,
|
||||
events: Events::default(),
|
||||
socket: ws_stream,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct WebSocketConnection {
|
||||
rx: Arc<Mutex<Receiver<tokio_tungstenite::tungstenite::Message>>>,
|
||||
tx: Arc<Mutex<Sender<tokio_tungstenite::tungstenite::Message>>>,
|
||||
}
|
||||
|
||||
impl<'a> WebSocketConnection {
|
||||
async fn new(websocket_url: String) -> WebSocketConnection {
|
||||
let parsed_url = Url::parse(&URLBundle::parse_url(websocket_url.clone())).unwrap();
|
||||
/*if parsed_url.scheme() != "ws" && parsed_url.scheme() != "wss" {
|
||||
return Err(tokio_tungstenite::tungstenite::Error::Url(
|
||||
UrlError::UnsupportedUrlScheme,
|
||||
));
|
||||
}*/
|
||||
|
||||
let (mut channel_write, mut channel_read): (
|
||||
Sender<tokio_tungstenite::tungstenite::Message>,
|
||||
Receiver<tokio_tungstenite::tungstenite::Message>,
|
||||
) = channel(32);
|
||||
|
||||
let shared_channel_write = Arc::new(Mutex::new(channel_write));
|
||||
let clone_shared_channel_write = shared_channel_write.clone();
|
||||
let shared_channel_read = Arc::new(Mutex::new(channel_read));
|
||||
let clone_shared_channel_read = shared_channel_read.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
let (mut ws_stream, _) = match connect_async_tls_with_config(
|
||||
&websocket_url,
|
||||
None,
|
||||
Some(Connector::NativeTls(
|
||||
TlsConnector::builder().build().unwrap(),
|
||||
)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ws_stream) => ws_stream,
|
||||
Err(_) => return, /*return Err(tokio_tungstenite::tungstenite::Error::Io(
|
||||
io::ErrorKind::ConnectionAborted.into(),
|
||||
))*/
|
||||
};
|
||||
|
||||
let (mut write_tx, mut write_rx) = ws_stream.split();
|
||||
|
||||
while let Some(msg) = shared_channel_read.lock().await.recv().await {
|
||||
write_tx.send(msg).await.unwrap();
|
||||
}
|
||||
|
||||
let event = while let Some(msg) = write_rx.next().await {
|
||||
shared_channel_write
|
||||
.lock()
|
||||
.await
|
||||
.send(msg.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
};
|
||||
});
|
||||
|
||||
WebSocketConnection {
|
||||
tx: clone_shared_channel_write,
|
||||
rx: clone_shared_channel_read,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Trait which defines the behaviour of an Observer. An Observer is an object which is subscribed to
|
||||
an Observable. The Observer is notified when the Observable's data changes.
|
||||
|
|
Loading…
Reference in New Issue