From 959bbac9bdbf0f5f48475bac46f8e9451c125c6b Mon Sep 17 00:00:00 2001 From: kozabrada123 <“kozabrada123@users.noreply.github.com”> Date: Sat, 13 May 2023 09:47:12 +0200 Subject: [PATCH] Fix nested task issues --- src/gateway.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index b84e9b9..969101a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -108,7 +108,7 @@ impl<'a> Gateway { .await { Ok(ws_stream) => ws_stream, - Err(e) => panic!("{:?}", e), + Err(e) => return Err(e), }; let (ws_tx, mut ws_rx) = ws_stream.split(); @@ -119,15 +119,28 @@ impl<'a> Gateway { let shared_events = gateway.events.clone(); + // Wait for the first hello and then spawn both tasks so we avoid nested tasks + // This automatically spawns the heartbeat task, but from the main thread + let msg = ws_rx.next().await.unwrap().unwrap(); + let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap(); + + if gateway_payload.op != 10 { + println!("Recieved non hello on gateway init, what is happening?"); + return Err(tokio_tungstenite::tungstenite::Error::Protocol(tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode(gateway_payload.op))) + } + + println!("GW: Received Hello"); + + let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); + gateway.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, shared_tx.clone())); + + // Now we can continously check for messages in a different task, since we aren't going to receive another hello task::spawn(async move { loop { - println!("Waiting for next event.."); let msg = ws_rx.next().await; - println!("Received event or sth"); if msg.as_ref().is_some() { let msg_unwrapped = msg.unwrap().unwrap(); gateway.handle_event(msg_unwrapped).await; - println!("Handled the event"); }; } }); @@ -265,10 +278,9 @@ impl<'a> Gateway { 9 => {todo!()} // Hello // Starts our heartbeat + // We should have already handled this in gateway init 10 => { - println!("GW: Received Hello"); - let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap(); - self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket_tx.clone())); + panic!("Recieved hello when it was unexpected"); } // Heartbeat ACK 11 => { @@ -328,8 +340,7 @@ impl HeartbeatHandler { let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); - websocket_tx.lock() - .await + websocket_tx.lock().await .send(msg) .await .unwrap();