fix: refactor gw and vgw closures

This commit is contained in:
kozabrada123 2024-04-16 16:59:13 +02:00
parent ae1e36fbb5
commit 3e29a3274a
4 changed files with 34 additions and 12 deletions

View File

@ -26,6 +26,7 @@ pub struct Gateway {
websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
store: Arc<Mutex<HashMap<Snowflake, Arc<RwLock<ObservableObject>>>>>,
url: String,
}
@ -75,6 +76,7 @@ impl Gateway {
websocket_send: shared_websocket_send.clone(),
websocket_receive,
kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
store: store.clone(),
url: websocket_url.clone(),
};
@ -103,7 +105,17 @@ impl Gateway {
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) {
loop {
let msg = self.websocket_receive.next().await;
let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("GW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))]

View File

@ -77,11 +77,6 @@ impl HeartbeatHandler {
let mut last_seq_number: Option<u64> = None;
loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
@ -115,6 +110,10 @@ impl HeartbeatHandler {
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("GW: Closing heartbeat task");
break;
}
}
if should_send {

View File

@ -37,6 +37,7 @@ pub struct VoiceGateway {
websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>,
kill_receive: tokio::sync::broadcast::Receiver<()>,
}
impl VoiceGateway {
@ -89,6 +90,7 @@ impl VoiceGateway {
websocket_send: shared_websocket_send.clone(),
websocket_receive,
kill_send: kill_send.clone(),
kill_receive: kill_send.subscribe(),
};
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello
@ -114,7 +116,17 @@ impl VoiceGateway {
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) {
loop {
let msg = self.websocket_receive.next().await;
let msg;
tokio::select! {
Ok(_) = self.kill_receive.recv() => {
log::trace!("VGW: Closing listener task");
break;
}
message = self.websocket_receive.next() => {
msg = message;
}
}
// PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))]

View File

@ -98,11 +98,6 @@ impl VoiceHeartbeatHandler {
let mut nonce: u64 = starting_nonce;
loop {
if kill_receive.try_recv().is_ok() {
trace!("VGW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
@ -136,6 +131,10 @@ impl VoiceHeartbeatHandler {
}
}
}
Ok(_) = kill_receive.recv() => {
log::trace!("VGW: Closing heartbeat task");
break;
}
}
if should_send {