chorus/src/gateway/heartbeat.rs

151 lines
5.2 KiB
Rust
Raw Normal View History

2023-11-19 19:12:29 +01:00
use futures_util::SinkExt;
use log::*;
2024-01-19 14:55:23 +01:00
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
2024-01-19 14:55:23 +01:00
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
2024-01-19 14:55:23 +01:00
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
2023-11-19 19:12:29 +01:00
2023-11-14 11:13:02 +01:00
use super::*;
use crate::types;
2023-11-14 11:13:02 +01:00
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
2023-11-14 11:13:02 +01:00
/// Handles sending heartbeats to the gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
#[derive(Debug)]
pub(super) struct HeartbeatHandler {
/// How ofter heartbeats need to be sent at a minimum
pub heartbeat_interval: Duration,
/// The send channel for the heartbeat thread
pub send: Sender<HeartbeatThreadCommunication>,
}
impl HeartbeatHandler {
pub fn new(
heartbeat_interval: Duration,
2023-11-19 21:15:10 +01:00
websocket_tx: Arc<Mutex<Sink>>,
2023-11-14 11:13:02 +01:00
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> Self {
2023-11-14 11:13:02 +01:00
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
#[cfg(not(target_arch = "wasm32"))]
task::spawn(async move {
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
Self::heartbeat_task(websocket_tx, heartbeat_interval, receive, kill_receive).await;
2023-11-14 11:13:02 +01:00
});
Self {
heartbeat_interval,
send,
}
}
/// The main heartbeat task;
///
/// Can be killed by the kill broadcast;
/// If the websocket is closed, will die out next time it tries to send a heartbeat;
pub async fn heartbeat_task(
2023-11-19 21:15:10 +01:00
websocket_tx: Arc<Mutex<Sink>>,
2023-11-14 11:13:02 +01:00
heartbeat_interval: Duration,
2023-11-19 19:12:29 +01:00
mut receive: Receiver<HeartbeatThreadCommunication>,
2023-11-14 11:13:02 +01:00
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
2024-01-19 14:55:23 +01:00
let mut last_heartbeat_timestamp: Instant = Instant::now();
2023-11-14 11:13:02 +01:00
let mut last_heartbeat_acknowledged = true;
let mut last_seq_number: Option<u64> = None;
2024-01-21 17:07:30 +01:00
2023-11-14 11:13:02 +01:00
loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
// If the server hasn't acknowledged our heartbeat we should resend it
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
};
let mut should_send = false;
tokio::select! {
() = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true;
}
Some(communication) = receive.recv() => {
// If we received a seq number update, use that as the last seq number
if communication.sequence_number.is_some() {
last_seq_number = communication.sequence_number;
}
if let Some(op_code) = communication.op_code {
match op_code {
GATEWAY_HEARTBEAT => {
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
should_send = true;
}
GATEWAY_HEARTBEAT_ACK => {
// The server received our heartbeat
last_heartbeat_acknowledged = true;
}
_ => {}
}
}
}
}
if should_send {
trace!("GW: Sending Heartbeat..");
let heartbeat = types::GatewayHeartbeat {
op: GATEWAY_HEARTBEAT,
d: last_seq_number,
};
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
let msg = GatewayMessage(heartbeat_json);
2023-11-14 11:13:02 +01:00
let send_result = websocket_tx.lock().await.send(msg.into()).await;
2023-11-14 11:13:02 +01:00
if send_result.is_err() {
// We couldn't send, the websocket is broken
warn!("GW: Couldnt send heartbeat, websocket seems broken");
break;
}
2024-01-19 14:55:23 +01:00
last_heartbeat_timestamp = Instant::now();
2023-11-14 11:13:02 +01:00
last_heartbeat_acknowledged = false;
}
}
}
}
/// Used for communications between the heartbeat and gateway thread.
/// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
#[derive(Clone, Copy, Debug)]
pub(super) struct HeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant
pub(super) op_code: Option<u8>,
/// The sequence number we got from discord, if any
pub(super) sequence_number: Option<u64>,
}