Fix nested task issues

This commit is contained in:
kozabrada123 2023-05-13 09:47:12 +02:00
parent cef8fe9fdb
commit 959bbac9bd
1 changed files with 20 additions and 9 deletions

View File

@ -108,7 +108,7 @@ impl<'a> Gateway {
.await .await
{ {
Ok(ws_stream) => ws_stream, Ok(ws_stream) => ws_stream,
Err(e) => panic!("{:?}", e), Err(e) => return Err(e),
}; };
let (ws_tx, mut ws_rx) = ws_stream.split(); let (ws_tx, mut ws_rx) = ws_stream.split();
@ -119,15 +119,28 @@ impl<'a> Gateway {
let shared_events = gateway.events.clone(); let shared_events = gateway.events.clone();
// Wait for the first hello and then spawn both tasks so we avoid nested tasks
// This automatically spawns the heartbeat task, but from the main thread
let msg = ws_rx.next().await.unwrap().unwrap();
let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap();
if gateway_payload.op != 10 {
println!("Recieved non hello on gateway init, what is happening?");
return Err(tokio_tungstenite::tungstenite::Error::Protocol(tokio_tungstenite::tungstenite::error::ProtocolError::InvalidOpcode(gateway_payload.op)))
}
println!("GW: Received Hello");
let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
gateway.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, shared_tx.clone()));
// Now we can continously check for messages in a different task, since we aren't going to receive another hello
task::spawn(async move { task::spawn(async move {
loop { loop {
println!("Waiting for next event..");
let msg = ws_rx.next().await; let msg = ws_rx.next().await;
println!("Received event or sth");
if msg.as_ref().is_some() { if msg.as_ref().is_some() {
let msg_unwrapped = msg.unwrap().unwrap(); let msg_unwrapped = msg.unwrap().unwrap();
gateway.handle_event(msg_unwrapped).await; gateway.handle_event(msg_unwrapped).await;
println!("Handled the event");
}; };
} }
}); });
@ -265,10 +278,9 @@ impl<'a> Gateway {
9 => {todo!()} 9 => {todo!()}
// Hello // Hello
// Starts our heartbeat // Starts our heartbeat
// We should have already handled this in gateway init
10 => { 10 => {
println!("GW: Received Hello"); panic!("Recieved hello when it was unexpected");
let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket_tx.clone()));
} }
// Heartbeat ACK // Heartbeat ACK
11 => { 11 => {
@ -328,8 +340,7 @@ impl HeartbeatHandler {
let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json); let msg = tokio_tungstenite::tungstenite::Message::text(heartbeat_json);
websocket_tx.lock() websocket_tx.lock().await
.await
.send(msg) .send(msg)
.await .await
.unwrap(); .unwrap();