Experimental heartbeats
This commit is contained in:
parent
4446aef8e1
commit
50c4c1a542
|
@ -17,9 +17,12 @@ use serde::Serialize;
|
|||
use serde_json::from_str;
|
||||
use tokio::io;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task;
|
||||
use tokio::time;
|
||||
use tokio::time::Instant;
|
||||
use tokio_tungstenite::tungstenite::error::UrlError;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::{connect_async, connect_async_tls_with_config};
|
||||
|
@ -35,6 +38,7 @@ pub struct Gateway<'a> {
|
|||
pub token: String,
|
||||
pub events: Events<'a>,
|
||||
websocket: WebSocketConnection,
|
||||
heartbeat_handler: Option<HeartbeatHandler>
|
||||
}
|
||||
|
||||
impl<'a> Gateway<'a> {
|
||||
|
@ -47,6 +51,7 @@ impl<'a> Gateway<'a> {
|
|||
token,
|
||||
events: Events::default(),
|
||||
websocket: WebSocketConnection::new(websocket_url).await,
|
||||
heartbeat_handler: None,
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -167,19 +172,94 @@ impl<'a> Gateway<'a> {
|
|||
// Invalid Session
|
||||
9 => {todo!()}
|
||||
// Hello
|
||||
// Should start our heartbeat
|
||||
// Starts our heartbeat
|
||||
10 => {
|
||||
let gateway_hello: HelloData = serde_json::from_str(gateway_payload.d.unwrap().as_ref()).unwrap();
|
||||
self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone()));
|
||||
}
|
||||
// Heartbeat ACK
|
||||
11 => {}
|
||||
2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)}
|
||||
_ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)}
|
||||
}
|
||||
|
||||
// If we have an active heartbeat thread and we received a seq number we should let it know
|
||||
if gateway_payload.s.is_some() {
|
||||
if self.heartbeat_handler.is_some() {
|
||||
|
||||
let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() };
|
||||
|
||||
self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Handles sending heartbeats to the gateway in another thread
|
||||
*/
|
||||
struct HeartbeatHandler {
|
||||
/// The heartbeat interval in milliseconds
|
||||
heartbeat_interval: u128,
|
||||
tx: Sender<HeartbeatThreadCommunication>,
|
||||
}
|
||||
|
||||
impl HeartbeatHandler {
|
||||
pub fn new(heartbeat_interval: u128, websocket_tx: Arc<Mutex<Sender<tokio_tungstenite::tungstenite::Message>>>) -> HeartbeatHandler {
|
||||
let (mut tx, mut rx) = mpsc::channel(32);
|
||||
|
||||
task::spawn(async move {
|
||||
let mut last_heartbeat: Instant = time::Instant::now();
|
||||
let mut last_seq_number: Option<u64> = None;
|
||||
|
||||
loop {
|
||||
|
||||
// If we received a seq number update, use that as the last seq number
|
||||
let hb_communication: Option<HeartbeatThreadCommunication> = rx.recv().await;
|
||||
while hb_communication.is_some() {
|
||||
last_seq_number = Some(hb_communication.unwrap().d);
|
||||
}
|
||||
|
||||
if last_heartbeat.elapsed().as_millis() > heartbeat_interval {
|
||||
|
||||
let heartbeat = GatewayHeartbeat {
|
||||
op: 1,
|
||||
d: last_seq_number
|
||||
};
|
||||
|
||||
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
|
||||
|
||||
let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json);
|
||||
|
||||
websocket_tx.lock()
|
||||
.await
|
||||
.send(msg)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
last_heartbeat = time::Instant::now();
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
Self { heartbeat_interval, tx }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Used to communicate with the main thread.
|
||||
Either signifies a sequence number update or a received heartbeat ack
|
||||
*/
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct HeartbeatThreadCommunication {
|
||||
/// An opcode for the communication we received
|
||||
op: u8,
|
||||
/// The sequence number we got from discord
|
||||
d: u64
|
||||
}
|
||||
|
||||
struct WebSocketConnection {
|
||||
rx: Arc<Mutex<Receiver<tokio_tungstenite::tungstenite::Message>>>,
|
||||
tx: Arc<Mutex<Sender<tokio_tungstenite::tungstenite::Message>>>,
|
||||
|
|
Loading…
Reference in New Issue