2023-11-19 19:12:29 +01:00
|
|
|
use futures_util::SinkExt;
|
|
|
|
use log::*;
|
|
|
|
|
2024-01-19 14:55:23 +01:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
use std::time::Instant;
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
use wasmtimer::std::Instant;
|
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2023-11-19 19:12:29 +01:00
|
|
|
use safina_timer::sleep_until;
|
2024-01-19 14:55:23 +01:00
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
use wasmtimer::tokio::sleep_until;
|
|
|
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use tokio::sync::mpsc::{Receiver, Sender};
|
|
|
|
|
2023-11-22 14:23:33 +01:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
use tokio::task;
|
2023-11-19 19:12:29 +01:00
|
|
|
|
2023-11-14 11:13:02 +01:00
|
|
|
use super::*;
|
2023-11-19 17:08:53 +01:00
|
|
|
use crate::types;
|
2023-11-14 11:13:02 +01:00
|
|
|
|
2023-11-14 15:43:08 +01:00
|
|
|
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
|
|
|
|
const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
|
|
|
|
|
2023-11-14 11:13:02 +01:00
|
|
|
/// Handles sending heartbeats to the gateway in another thread
|
|
|
|
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub(super) struct HeartbeatHandler {
|
|
|
|
/// How ofter heartbeats need to be sent at a minimum
|
|
|
|
pub heartbeat_interval: Duration,
|
|
|
|
/// The send channel for the heartbeat thread
|
|
|
|
pub send: Sender<HeartbeatThreadCommunication>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl HeartbeatHandler {
|
|
|
|
pub fn new(
|
|
|
|
heartbeat_interval: Duration,
|
2023-11-19 21:15:10 +01:00
|
|
|
websocket_tx: Arc<Mutex<Sink>>,
|
2023-11-14 11:13:02 +01:00
|
|
|
kill_rc: tokio::sync::broadcast::Receiver<()>,
|
2023-11-19 17:08:53 +01:00
|
|
|
) -> Self {
|
2023-11-14 11:13:02 +01:00
|
|
|
let (send, receive) = tokio::sync::mpsc::channel(32);
|
|
|
|
let kill_receive = kill_rc.resubscribe();
|
|
|
|
|
2023-11-22 14:23:33 +01:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
task::spawn(async move {
|
|
|
|
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
|
|
|
|
});
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
wasm_bindgen_futures::spawn_local(async move {
|
2023-11-19 17:08:53 +01:00
|
|
|
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
|
2023-11-14 11:13:02 +01:00
|
|
|
});
|
|
|
|
|
|
|
|
Self {
|
|
|
|
heartbeat_interval,
|
|
|
|
send,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// The main heartbeat task;
|
|
|
|
///
|
|
|
|
/// Can be killed by the kill broadcast;
|
|
|
|
/// If the websocket is closed, will die out next time it tries to send a heartbeat;
|
|
|
|
pub async fn heartbeat_task(
|
2023-11-19 21:15:10 +01:00
|
|
|
websocket_tx: Arc<Mutex<Sink>>,
|
2023-11-14 11:13:02 +01:00
|
|
|
heartbeat_interval: Duration,
|
2023-11-19 19:12:29 +01:00
|
|
|
mut receive: Receiver<HeartbeatThreadCommunication>,
|
2023-11-14 11:13:02 +01:00
|
|
|
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
|
|
|
|
) {
|
2024-01-19 14:55:23 +01:00
|
|
|
let mut last_heartbeat_timestamp: Instant = Instant::now();
|
2023-11-14 11:13:02 +01:00
|
|
|
let mut last_heartbeat_acknowledged = true;
|
|
|
|
let mut last_seq_number: Option<u64> = None;
|
2024-01-19 15:06:27 +01:00
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2023-11-19 19:12:29 +01:00
|
|
|
safina_timer::start_timer_thread();
|
|
|
|
|
2023-11-14 11:13:02 +01:00
|
|
|
loop {
|
|
|
|
if kill_receive.try_recv().is_ok() {
|
|
|
|
trace!("GW: Closing heartbeat task");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
let timeout = if last_heartbeat_acknowledged {
|
|
|
|
heartbeat_interval
|
|
|
|
} else {
|
|
|
|
// If the server hasn't acknowledged our heartbeat we should resend it
|
|
|
|
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut should_send = false;
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
() = sleep_until(last_heartbeat_timestamp + timeout) => {
|
|
|
|
should_send = true;
|
|
|
|
}
|
|
|
|
Some(communication) = receive.recv() => {
|
|
|
|
// If we received a seq number update, use that as the last seq number
|
|
|
|
if communication.sequence_number.is_some() {
|
|
|
|
last_seq_number = communication.sequence_number;
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(op_code) = communication.op_code {
|
|
|
|
match op_code {
|
|
|
|
GATEWAY_HEARTBEAT => {
|
|
|
|
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
|
|
|
|
should_send = true;
|
|
|
|
}
|
|
|
|
GATEWAY_HEARTBEAT_ACK => {
|
|
|
|
// The server received our heartbeat
|
|
|
|
last_heartbeat_acknowledged = true;
|
|
|
|
}
|
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if should_send {
|
|
|
|
trace!("GW: Sending Heartbeat..");
|
|
|
|
|
|
|
|
let heartbeat = types::GatewayHeartbeat {
|
|
|
|
op: GATEWAY_HEARTBEAT,
|
|
|
|
d: last_seq_number,
|
|
|
|
};
|
|
|
|
|
|
|
|
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
|
|
|
|
|
2023-11-19 17:08:53 +01:00
|
|
|
let msg = GatewayMessage(heartbeat_json);
|
2023-11-14 11:13:02 +01:00
|
|
|
|
2023-11-19 17:08:53 +01:00
|
|
|
let send_result = websocket_tx.lock().await.send(msg.into()).await;
|
2023-11-14 11:13:02 +01:00
|
|
|
if send_result.is_err() {
|
|
|
|
// We couldn't send, the websocket is broken
|
|
|
|
warn!("GW: Couldnt send heartbeat, websocket seems broken");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2024-01-19 14:55:23 +01:00
|
|
|
last_heartbeat_timestamp = Instant::now();
|
2023-11-14 11:13:02 +01:00
|
|
|
last_heartbeat_acknowledged = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Used for communications between the heartbeat and gateway thread.
|
|
|
|
/// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
|
|
pub(super) struct HeartbeatThreadCommunication {
|
|
|
|
/// The opcode for the communication we received, if relevant
|
|
|
|
pub(super) op_code: Option<u8>,
|
|
|
|
/// The sequence number we got from discord, if any
|
|
|
|
pub(super) sequence_number: Option<u64>,
|
|
|
|
}
|