fix: refactor gw and vgw closures

This commit is contained in:
kozabrada123 2024-04-16 16:59:13 +02:00
parent 7cbb3eca29
commit 9aaeb579a1
4 changed files with 34 additions and 12 deletions

View File

@ -26,6 +26,7 @@ pub struct Gateway {
websocket_send: Arc<Mutex<Sink>>, websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream, websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>, store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>,
url: String, url: String,
} }
@ -75,6 +76,7 @@ impl Gateway {
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
websocket_receive, websocket_receive,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
store: store.clone(), store: store.clone(),
url: websocket_url.clone(), url: websocket_url.clone(),
}; };
@ -103,7 +105,17 @@ impl Gateway {
/// Can only be stopped by closing the websocket, cannot be made to listen for kill /// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) { pub async fn gateway_listen_task(&mut self) {
loop { loop {
let msg = self.websocket_receive.next().await; let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("GW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling // PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]

View File

@ -77,11 +77,6 @@ impl HeartbeatHandler {
let mut last_seq_number: Option<u64> = None; let mut last_seq_number: Option<u64> = None;
loop { loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged { let timeout = if last_heartbeat_acknowledged {
heartbeat_interval heartbeat_interval
} else { } else {
@ -115,6 +110,10 @@ impl HeartbeatHandler {
} }
} }
} }
Ok(_) = kill_receive.recv() => {
log::trace!("GW: Closing heartbeat task");
break;
}
} }
if should_send { if should_send {

View File

@ -37,6 +37,7 @@ pub struct VoiceGateway {
websocket_send: Arc<Mutex<Sink>>, websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream, websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>, kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
} }
impl VoiceGateway { impl VoiceGateway {
@ -89,6 +90,7 @@ impl VoiceGateway {
websocket_send: shared_websocket_send.clone(), websocket_send: shared_websocket_send.clone(),
websocket_receive, websocket_receive,
kill_send: kill_send.clone(), kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
}; };
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello // Now we can continuously check for messages in a different task, since we aren't going to receive another hello
@ -114,7 +116,17 @@ impl VoiceGateway {
/// Can only be stopped by closing the websocket, cannot be made to listen for kill /// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) { pub async fn gateway_listen_task(&mut self) {
loop { loop {
let msg = self.websocket_receive.next().await; let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("VGW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling // PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]

View File

@ -98,11 +98,6 @@ impl VoiceHeartbeatHandler {
let mut nonce: u64 = starting_nonce; let mut nonce: u64 = starting_nonce;
loop { loop {
if kill_receive.try_recv().is_ok() {
trace!("VGW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged { let timeout = if last_heartbeat_acknowledged {
heartbeat_interval heartbeat_interval
} else { } else {
@ -136,6 +131,10 @@ impl VoiceHeartbeatHandler {
} }
} }
} }
Ok(_) = kill_receive.recv() => {
log::trace!("VGW: Closing heartbeat task");
break;
}
} }
if should_send { if should_send {