diff --git a/src/gateway.rs b/src/gateway.rs index ef9905c..0e5a10d 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -66,6 +66,9 @@ const GATEWAY_CALL_SYNC: u8 = 13; /// See [types::LazyRequest] const GATEWAY_LAZY_REQUEST: u8 = 14; +/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms +const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; + #[derive(Debug)] /** Represents a handle to a Gateway connection. A Gateway connection will create observable @@ -1180,6 +1183,25 @@ impl Gateway { } GATEWAY_HEARTBEAT_ACK => { println!("GW: Received Heartbeat ACK"); + + if self.heartbeat_handler.is_none() { + return; + } + + // Tell the heartbeat handler we received an ack + + let heartbeat_communication = HeartbeatThreadCommunication { + sequence_number: gateway_payload.sequence_number, + op_code: Some(GATEWAY_HEARTBEAT_ACK), + }; + + self.heartbeat_handler + .as_mut() + .unwrap() + .send + .send(heartbeat_communication) + .await + .unwrap(); } GATEWAY_IDENTIFY | GATEWAY_UPDATE_PRESENCE @@ -1245,6 +1267,7 @@ impl HeartbeatHandler { let handle: JoinHandle<()> = task::spawn(async move { let mut last_heartbeat_timestamp: Instant = time::Instant::now(); + let mut last_heartbeat_acknowledged = true; let mut last_seq_number: Option = None; loop { @@ -1272,13 +1295,20 @@ impl HeartbeatHandler { should_send = true; } GATEWAY_HEARTBEAT_ACK => { - todo!() + // The server received our heartbeat + last_heartbeat_acknowledged = true; } _ => {} } } } + // If the server hasn't acknowledged our heartbeat we should resend it + if !last_heartbeat_acknowledged && last_heartbeat_timestamp.elapsed().as_millis() > HEARTBEAT_ACK_TIMEOUT { + should_send = true; + println!("GW: Timed out waiting for a heartbeat ack, resending"); + } + if should_send { println!("GW: Sending Heartbeat.."); @@ -1294,6 +1324,7 @@ impl HeartbeatHandler { websocket_tx.lock().await.send(msg).await.unwrap(); last_heartbeat_timestamp = time::Instant::now(); + last_heartbeat_acknowledged = false; } } });