Fix gateway heartbeat on WASM (#460)

It turns out `std::time::Instant::now()` panics WASM (see #459) and
breaks the heartbeat handler.

This pr attempts to fix that by replacing `std::time::Instant` with
`wasmtimer::std::Instant` and `safina_timer::sleep_until` with
`wasmtimer::tokio::sleep_until`.
This commit is contained in:
Flori 2024-01-19 17:24:48 +01:00 committed by GitHub
commit 65d9de88fe
5 changed files with 47 additions and 24 deletions

25
Cargo.lock generated
View File

@ -221,7 +221,6 @@ dependencies = [
"reqwest",
"rustls",
"rustls-native-certs",
"safina-timer",
"serde",
"serde-aux",
"serde_json",
@ -235,6 +234,7 @@ dependencies = [
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",
"wasmtimer",
"ws_stream_wasm",
]
@ -1688,15 +1688,6 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "safina-timer"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1081a264d1a3e81b75c4bcd5696094fb6ce470c2ded14cbd47bcb5229079b9df"
dependencies = [
"once_cell",
]
[[package]]
name = "schannel"
version = "0.1.23"
@ -2688,6 +2679,20 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "wasmtimer"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf"
dependencies = [
"futures",
"js-sys",
"parking_lot",
"pin-utils",
"slab",
"wasm-bindgen",
]
[[package]]
name = "web-sys"
version = "0.3.66"

View File

@ -52,7 +52,6 @@ sqlx = { version = "0.7.3", features = [
"runtime-tokio-native-tls",
"any",
], optional = true }
safina-timer = "0.1.11"
rand = "0.8.5"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@ -69,6 +68,7 @@ hostname = "0.3.1"
getrandom = { version = "0.2.12", features = ["js"] }
ws_stream_wasm = "0.7.4"
wasm-bindgen-futures = "0.4.39"
wasmtimer = "0.2.0"
[dev-dependencies]
lazy_static = "1.4.0"

View File

@ -8,6 +8,11 @@ use chorus::{
use std::{sync::Arc, time::Duration};
use tokio::{self};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
// This example creates a simple gateway connection and a basic observer struct
// Due to certain limitations all observers must impl debug
@ -54,10 +59,9 @@ async fn main() {
let mut identify = GatewayIdentifyPayload::common();
identify.token = token;
gateway.send_identify(identify).await;
safina_timer::start_timer_thread();
// Do something on the main thread so we don't quit
loop {
safina_timer::sleep_for(Duration::MAX).await
sleep(Duration::from_secs(3600)).await;
}
}

View File

@ -3,6 +3,11 @@ use std::time::Duration;
use chorus::gateway::Gateway;
use chorus::{self, types::GatewayIdentifyPayload};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
/// This example creates a simple gateway connection and a session with an Identify event
#[tokio::main(flavor = "current_thread")]
async fn main() {
@ -10,7 +15,7 @@ async fn main() {
let websocket_url_spacebar = "wss://gateway.old.server.spacebar.chat/".to_string();
// Initiate the gateway connection, starting a listener in one thread and a heartbeat handler in another
let _ = Gateway::spawn(websocket_url_spacebar).await.unwrap();
let gateway = Gateway::spawn(websocket_url_spacebar).await.unwrap();
// At this point, we are connected to the server and are sending heartbeats, however we still haven't authenticated
@ -26,10 +31,10 @@ async fn main() {
identify.token = token;
// Send off the event
safina_timer::start_timer_thread();
gateway.send_identify(identify).await;
// Do something on the main thread so we don't quit
loop {
safina_timer::sleep_for(Duration::MAX).await
sleep(Duration::from_secs(3600)).await;
}
}

View File

@ -1,9 +1,20 @@
use futures_util::SinkExt;
use log::*;
use std::time::{self, Duration, Instant};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
use safina_timer::sleep_until;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
@ -57,12 +68,10 @@ impl HeartbeatHandler {
mut receive: Receiver<HeartbeatThreadCommunication>,
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
let mut last_heartbeat_timestamp: Instant = time::Instant::now();
let mut last_heartbeat_timestamp: Instant = Instant::now();
let mut last_heartbeat_acknowledged = true;
let mut last_seq_number: Option<u64> = None;
safina_timer::start_timer_thread();
loop {
if kill_receive.try_recv().is_ok() {
trace!("GW: Closing heartbeat task");
@ -123,7 +132,7 @@ impl HeartbeatHandler {
break;
}
last_heartbeat_timestamp = time::Instant::now();
last_heartbeat_timestamp = Instant::now();
last_heartbeat_acknowledged = false;
}
}