From 3e29a3274a7db855867991024ecb84b699cb2261 Mon Sep 17 00:00:00 2001 From: kozabrada123 <59031733+kozabrada123@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:59:13 +0200 Subject: [PATCH] fix: refactor gw and vgw closures --- src/gateway/gateway.rs | 14 +++++++++++++- src/gateway/heartbeat.rs | 9 ++++----- src/voice/gateway/gateway.rs | 14 +++++++++++++- src/voice/gateway/heartbeat.rs | 9 ++++----- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs index 231590e..c146320 100644 --- a/src/gateway/gateway.rs +++ b/src/gateway/gateway.rs @@ -26,6 +26,7 @@ pub struct Gateway { websocket_send: Arc>, websocket_receive: Stream, kill_send: tokio::sync::broadcast::Sender<()>, + kill_receive: tokio::sync::broadcast::Receiver<()>, store: Arc>>>>, url: String, } @@ -75,6 +76,7 @@ impl Gateway { websocket_send: shared_websocket_send.clone(), websocket_receive, kill_send: kill_send.clone(), + kill_receive: kill_send.subscribe(), store: store.clone(), url: websocket_url.clone(), }; @@ -103,7 +105,17 @@ impl Gateway { /// Can only be stopped by closing the websocket, cannot be made to listen for kill pub async fn gateway_listen_task(&mut self) { loop { - let msg = self.websocket_receive.next().await; + let msg; + + tokio::select! { + Ok(_) = self.kill_receive.recv() => { + log::trace!("GW: Closing listener task"); + break; + } + message = self.websocket_receive.next() => { + msg = message; + } + } // PRETTYFYME: Remove inline conditional compiling #[cfg(not(target_arch = "wasm32"))] diff --git a/src/gateway/heartbeat.rs b/src/gateway/heartbeat.rs index 66f85e3..5dcc98d 100644 --- a/src/gateway/heartbeat.rs +++ b/src/gateway/heartbeat.rs @@ -77,11 +77,6 @@ impl HeartbeatHandler { let mut last_seq_number: Option = None; loop { - if kill_receive.try_recv().is_ok() { - trace!("GW: Closing heartbeat task"); - break; - } - let timeout = if last_heartbeat_acknowledged { heartbeat_interval } else { @@ -115,6 +110,10 @@ impl HeartbeatHandler { } } } + Ok(_) = kill_receive.recv() => { + log::trace!("GW: Closing heartbeat task"); + break; + } } if should_send { diff --git a/src/voice/gateway/gateway.rs b/src/voice/gateway/gateway.rs index 643785d..3b6c022 100644 --- a/src/voice/gateway/gateway.rs +++ b/src/voice/gateway/gateway.rs @@ -37,6 +37,7 @@ pub struct VoiceGateway { websocket_send: Arc>, websocket_receive: Stream, kill_send: tokio::sync::broadcast::Sender<()>, + kill_receive: tokio::sync::broadcast::Receiver<()>, } impl VoiceGateway { @@ -89,6 +90,7 @@ impl VoiceGateway { websocket_send: shared_websocket_send.clone(), websocket_receive, kill_send: kill_send.clone(), + kill_receive: kill_send.subscribe(), }; // Now we can continuously check for messages in a different task, since we aren't going to receive another hello @@ -114,7 +116,17 @@ impl VoiceGateway { /// Can only be stopped by closing the websocket, cannot be made to listen for kill pub async fn gateway_listen_task(&mut self) { loop { - let msg = self.websocket_receive.next().await; + let msg; + + tokio::select! { + Ok(_) = self.kill_receive.recv() => { + log::trace!("VGW: Closing listener task"); + break; + } + message = self.websocket_receive.next() => { + msg = message; + } + } // PRETTYFYME: Remove inline conditional compiling #[cfg(not(target_arch = "wasm32"))] diff --git a/src/voice/gateway/heartbeat.rs b/src/voice/gateway/heartbeat.rs index 3632526..2b9fde5 100644 --- a/src/voice/gateway/heartbeat.rs +++ b/src/voice/gateway/heartbeat.rs @@ -98,11 +98,6 @@ impl VoiceHeartbeatHandler { let mut nonce: u64 = starting_nonce; loop { - if kill_receive.try_recv().is_ok() { - trace!("VGW: Closing heartbeat task"); - break; - } - let timeout = if last_heartbeat_acknowledged { heartbeat_interval } else { @@ -136,6 +131,10 @@ impl VoiceHeartbeatHandler { } } } + Ok(_) = kill_receive.recv() => { + log::trace!("VGW: Closing heartbeat task"); + break; + } } if should_send {