Move Heartbeathandler code together
This commit is contained in:
parent
f4ae80fee9
commit
70812c529a
|
@ -450,101 +450,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles sending heartbeats to the gateway in another thread
|
|
||||||
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct HeartbeatHandler {
|
|
||||||
/// How ofter heartbeats need to be sent at a minimum
|
|
||||||
pub heartbeat_interval: Duration,
|
|
||||||
/// The send channel for the heartbeat thread
|
|
||||||
pub send: Sender<HeartbeatThreadCommunication>,
|
|
||||||
/// The handle of the thread
|
|
||||||
handle: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HeartbeatHandler {
|
|
||||||
pub async fn heartbeat_task<T: MessageCapable + Send + 'static, S: Sink<T> + Send>(
|
|
||||||
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
|
||||||
heartbeat_interval: Duration,
|
|
||||||
mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>,
|
|
||||||
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
|
|
||||||
) {
|
|
||||||
let mut last_heartbeat_timestamp: Instant = time::Instant::now();
|
|
||||||
let mut last_heartbeat_acknowledged = true;
|
|
||||||
let mut last_seq_number: Option<u64> = None;
|
|
||||||
safina_timer::start_timer_thread();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if kill_receive.try_recv().is_ok() {
|
|
||||||
trace!("GW: Closing heartbeat task");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let timeout = if last_heartbeat_acknowledged {
|
|
||||||
heartbeat_interval
|
|
||||||
} else {
|
|
||||||
// If the server hasn't acknowledged our heartbeat we should resend it
|
|
||||||
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut should_send = false;
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
() = sleep_until(last_heartbeat_timestamp + timeout) => {
|
|
||||||
should_send = true;
|
|
||||||
}
|
|
||||||
Some(communication) = receive.recv() => {
|
|
||||||
// If we received a seq number update, use that as the last seq number
|
|
||||||
if communication.sequence_number.is_some() {
|
|
||||||
last_seq_number = communication.sequence_number;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(op_code) = communication.op_code {
|
|
||||||
match op_code {
|
|
||||||
GATEWAY_HEARTBEAT => {
|
|
||||||
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
|
|
||||||
should_send = true;
|
|
||||||
}
|
|
||||||
GATEWAY_HEARTBEAT_ACK => {
|
|
||||||
// The server received our heartbeat
|
|
||||||
last_heartbeat_acknowledged = true;
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if should_send {
|
|
||||||
trace!("GW: Sending Heartbeat..");
|
|
||||||
|
|
||||||
let heartbeat = types::GatewayHeartbeat {
|
|
||||||
op: GATEWAY_HEARTBEAT,
|
|
||||||
d: last_seq_number,
|
|
||||||
};
|
|
||||||
|
|
||||||
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
|
|
||||||
|
|
||||||
let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json);
|
|
||||||
|
|
||||||
let send_result = websocket_tx
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send(MessageCapable::from_str(msg.to_string().as_str()))
|
|
||||||
.await;
|
|
||||||
if send_result.is_err() {
|
|
||||||
// We couldn't send, the websocket is broken
|
|
||||||
warn!("GW: Couldnt send heartbeat, websocket seems broken");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
last_heartbeat_timestamp = time::Instant::now();
|
|
||||||
last_heartbeat_acknowledged = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait(?Send)]
|
#[async_trait(?Send)]
|
||||||
pub trait GatewayHandleCapable<T, S>
|
pub trait GatewayHandleCapable<T, S>
|
||||||
where
|
where
|
||||||
|
@ -653,6 +558,100 @@ where
|
||||||
async fn close(&self);
|
async fn close(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handles sending heartbeats to the gateway in another thread
|
||||||
|
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct HeartbeatHandler {
|
||||||
|
/// How ofter heartbeats need to be sent at a minimum
|
||||||
|
pub heartbeat_interval: Duration,
|
||||||
|
/// The send channel for the heartbeat thread
|
||||||
|
pub send: Sender<HeartbeatThreadCommunication>,
|
||||||
|
/// The handle of the thread
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HeartbeatHandler {
|
||||||
|
pub async fn heartbeat_task<T: MessageCapable + Send + 'static, S: Sink<T> + Send>(
|
||||||
|
websocket_tx: Arc<Mutex<SplitSink<S, T>>>,
|
||||||
|
heartbeat_interval: Duration,
|
||||||
|
mut receive: tokio::sync::mpsc::Receiver<HeartbeatThreadCommunication>,
|
||||||
|
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
|
||||||
|
) {
|
||||||
|
let mut last_heartbeat_timestamp: Instant = time::Instant::now();
|
||||||
|
let mut last_heartbeat_acknowledged = true;
|
||||||
|
let mut last_seq_number: Option<u64> = None;
|
||||||
|
safina_timer::start_timer_thread();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if kill_receive.try_recv().is_ok() {
|
||||||
|
trace!("GW: Closing heartbeat task");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let timeout = if last_heartbeat_acknowledged {
|
||||||
|
heartbeat_interval
|
||||||
|
} else {
|
||||||
|
// If the server hasn't acknowledged our heartbeat we should resend it
|
||||||
|
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut should_send = false;
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
() = sleep_until(last_heartbeat_timestamp + timeout) => {
|
||||||
|
should_send = true;
|
||||||
|
}
|
||||||
|
Some(communication) = receive.recv() => {
|
||||||
|
// If we received a seq number update, use that as the last seq number
|
||||||
|
if communication.sequence_number.is_some() {
|
||||||
|
last_seq_number = communication.sequence_number;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(op_code) = communication.op_code {
|
||||||
|
match op_code {
|
||||||
|
GATEWAY_HEARTBEAT => {
|
||||||
|
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
|
||||||
|
should_send = true;
|
||||||
|
}
|
||||||
|
GATEWAY_HEARTBEAT_ACK => {
|
||||||
|
// The server received our heartbeat
|
||||||
|
last_heartbeat_acknowledged = true;
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if should_send {
|
||||||
|
trace!("GW: Sending Heartbeat..");
|
||||||
|
|
||||||
|
let heartbeat = types::GatewayHeartbeat {
|
||||||
|
op: GATEWAY_HEARTBEAT,
|
||||||
|
d: last_seq_number,
|
||||||
|
};
|
||||||
|
|
||||||
|
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
|
||||||
|
|
||||||
|
let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json);
|
||||||
|
|
||||||
|
let send_result = websocket_tx
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.send(MessageCapable::from_str(msg.to_string().as_str()))
|
||||||
|
.await;
|
||||||
|
if send_result.is_err() {
|
||||||
|
// We couldn't send, the websocket is broken
|
||||||
|
warn!("GW: Couldnt send heartbeat, websocket seems broken");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_heartbeat_timestamp = time::Instant::now();
|
||||||
|
last_heartbeat_acknowledged = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
// TODO: Make me not a trait!!
|
// TODO: Make me not a trait!!
|
||||||
pub trait HeartbeatHandlerCapable<T: MessageCapable + Send + 'static, S: Sink<T>> {
|
pub trait HeartbeatHandlerCapable<T: MessageCapable + Send + 'static, S: Sink<T>> {
|
||||||
|
|
Loading…
Reference in New Issue