diff --git a/Cargo.lock b/Cargo.lock index 2d4655a..91c5f98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,7 @@ dependencies = [ "lazy_static", "log", "poem", + "pubserve", "rand", "regex", "reqwest", @@ -1594,6 +1595,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "pubserve" +version = "1.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1781b2a51798c98a381e839e61bc5ce6426bd89bb9c3f9142de2086a80591cd" +dependencies = [ + "async-trait", +] + [[package]] name = "quote" version = "1.0.36" diff --git a/Cargo.toml b/Cargo.toml index 003de18..ad0795c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ crypto_secretbox = { version = "0.1.1", optional = true } rand = "0.8.5" flate2 = { version = "1.0.30", optional = true } webpki-roots = "0.26.3" +pubserve = { version = "1.1.0-alpha.1", features = ["async", "send"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] rustls = "0.21.10" diff --git a/examples/gateway_observers.rs b/examples/gateway_observers.rs index 17390b6..96bd56b 100644 --- a/examples/gateway_observers.rs +++ b/examples/gateway_observers.rs @@ -17,9 +17,9 @@ use async_trait::async_trait; use chorus::gateway::{Gateway, GatewayOptions}; use chorus::{ self, - gateway::Observer, types::{GatewayIdentifyPayload, GatewayReady}, }; +use pubserve::Subscriber; use std::{sync::Arc, time::Duration}; use tokio::{self}; @@ -38,7 +38,7 @@ pub struct ExampleObserver {} // The Observer trait can be implemented for a struct for a given websocketevent to handle observing it // One struct can be an observer of multiple websocketevents, if needed #[async_trait] -impl Observer for ExampleObserver { +impl Subscriber for ExampleObserver { // After we subscribe to an event this function is called every time we receive it async fn update(&self, _data: &GatewayReady) { println!("Observed Ready!"); @@ -56,7 +56,9 @@ async fn main() { let options = GatewayOptions::default(); // Initiate the gateway connection - let gateway = Gateway::spawn(gateway_websocket_url, options).await.unwrap(); + let gateway = Gateway::spawn(gateway_websocket_url, options) + .await + .unwrap(); // Create an instance of our observer let observer = ExampleObserver {}; diff --git a/src/gateway/events.rs b/src/gateway/events.rs index 8d38cca..049434b 100644 --- a/src/gateway/events.rs +++ b/src/gateway/events.rs @@ -2,6 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. +use pubserve::Publisher; + use super::*; use crate::types; @@ -23,144 +25,144 @@ pub struct Events { pub call: Call, pub voice: Voice, pub webhooks: Webhooks, - pub gateway_identify_payload: GatewayEvent, - pub gateway_resume: GatewayEvent, - pub error: GatewayEvent, + pub gateway_identify_payload: Publisher, + pub gateway_resume: Publisher, + pub error: Publisher, } #[derive(Default, Debug)] pub struct Application { - pub command_permissions_update: GatewayEvent, + pub command_permissions_update: Publisher, } #[derive(Default, Debug)] pub struct AutoModeration { - pub rule_create: GatewayEvent, - pub rule_update: GatewayEvent, - pub rule_delete: GatewayEvent, - pub action_execution: GatewayEvent, + pub rule_create: Publisher, + pub rule_update: Publisher, + pub rule_delete: Publisher, + pub action_execution: Publisher, } #[derive(Default, Debug)] pub struct Session { - pub ready: GatewayEvent, - pub ready_supplemental: GatewayEvent, - pub replace: GatewayEvent, - pub reconnect: GatewayEvent, - pub invalid: GatewayEvent, + pub ready: Publisher, + pub ready_supplemental: Publisher, + pub replace: Publisher, + pub reconnect: Publisher, + pub invalid: Publisher, } #[derive(Default, Debug)] pub struct StageInstance { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, } #[derive(Default, Debug)] pub struct Message { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, - pub delete_bulk: GatewayEvent, - pub reaction_add: GatewayEvent, - pub reaction_remove: GatewayEvent, - pub reaction_remove_all: GatewayEvent, - pub reaction_remove_emoji: GatewayEvent, - pub ack: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, + pub delete_bulk: Publisher, + pub reaction_add: Publisher, + pub reaction_remove: Publisher, + pub reaction_remove_all: Publisher, + pub reaction_remove_emoji: Publisher, + pub ack: Publisher, } #[derive(Default, Debug)] pub struct User { - pub update: GatewayEvent, - pub guild_settings_update: GatewayEvent, - pub presence_update: GatewayEvent, - pub typing_start: GatewayEvent, + pub update: Publisher, + pub guild_settings_update: Publisher, + pub presence_update: Publisher, + pub typing_start: Publisher, } #[derive(Default, Debug)] pub struct Relationship { - pub add: GatewayEvent, - pub remove: GatewayEvent, + pub add: Publisher, + pub remove: Publisher, } #[derive(Default, Debug)] pub struct Channel { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub unread_update: GatewayEvent, - pub delete: GatewayEvent, - pub pins_update: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub unread_update: Publisher, + pub delete: Publisher, + pub pins_update: Publisher, } #[derive(Default, Debug)] pub struct Thread { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, - pub list_sync: GatewayEvent, - pub member_update: GatewayEvent, - pub members_update: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, + pub list_sync: Publisher, + pub member_update: Publisher, + pub members_update: Publisher, } #[derive(Default, Debug)] pub struct Guild { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, - pub audit_log_entry_create: GatewayEvent, - pub ban_add: GatewayEvent, - pub ban_remove: GatewayEvent, - pub emojis_update: GatewayEvent, - pub stickers_update: GatewayEvent, - pub integrations_update: GatewayEvent, - pub member_add: GatewayEvent, - pub member_remove: GatewayEvent, - pub member_update: GatewayEvent, - pub members_chunk: GatewayEvent, - pub role_create: GatewayEvent, - pub role_update: GatewayEvent, - pub role_delete: GatewayEvent, - pub role_scheduled_event_create: GatewayEvent, - pub role_scheduled_event_update: GatewayEvent, - pub role_scheduled_event_delete: GatewayEvent, - pub role_scheduled_event_user_add: GatewayEvent, - pub role_scheduled_event_user_remove: GatewayEvent, - pub passive_update_v1: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, + pub audit_log_entry_create: Publisher, + pub ban_add: Publisher, + pub ban_remove: Publisher, + pub emojis_update: Publisher, + pub stickers_update: Publisher, + pub integrations_update: Publisher, + pub member_add: Publisher, + pub member_remove: Publisher, + pub member_update: Publisher, + pub members_chunk: Publisher, + pub role_create: Publisher, + pub role_update: Publisher, + pub role_delete: Publisher, + pub role_scheduled_event_create: Publisher, + pub role_scheduled_event_update: Publisher, + pub role_scheduled_event_delete: Publisher, + pub role_scheduled_event_user_add: Publisher, + pub role_scheduled_event_user_remove: Publisher, + pub passive_update_v1: Publisher, } #[derive(Default, Debug)] pub struct Invite { - pub create: GatewayEvent, - pub delete: GatewayEvent, + pub create: Publisher, + pub delete: Publisher, } #[derive(Default, Debug)] pub struct Integration { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, } #[derive(Default, Debug)] pub struct Interaction { - pub create: GatewayEvent, + pub create: Publisher, } #[derive(Default, Debug)] pub struct Call { - pub create: GatewayEvent, - pub update: GatewayEvent, - pub delete: GatewayEvent, + pub create: Publisher, + pub update: Publisher, + pub delete: Publisher, } #[derive(Default, Debug)] pub struct Voice { - pub state_update: GatewayEvent, - pub server_update: GatewayEvent, + pub state_update: Publisher, + pub server_update: Publisher, } #[derive(Default, Debug)] pub struct Webhooks { - pub update: GatewayEvent, + pub update: Publisher, } diff --git a/src/gateway/gateway.rs b/src/gateway/gateway.rs index 1312137..f1fc03c 100644 --- a/src/gateway/gateway.rs +++ b/src/gateway/gateway.rs @@ -7,6 +7,7 @@ use std::time::Duration; use flate2::Decompress; use futures_util::{SinkExt, StreamExt}; use log::*; +use pubserve::Publisher; #[cfg(not(target_arch = "wasm32"))] use tokio::task; @@ -197,7 +198,7 @@ impl Gateway { #[allow(dead_code)] // TODO: Remove this allow annotation async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( data: &'a str, - event: &mut GatewayEvent, + event: &mut Publisher, ) -> Result<(), serde_json::Error> { let data_deserialize_result: Result = serde_json::from_str(data); @@ -205,7 +206,7 @@ impl Gateway { return Err(data_deserialize_result.err().unwrap()); } - event.notify(data_deserialize_result.unwrap()).await; + event.publish(data_deserialize_result.unwrap()).await; Ok(()) } @@ -253,7 +254,7 @@ impl Gateway { if let Some(error) = msg.error() { warn!("GW: Received error {:?}, connection will close..", error); self.close().await; - self.events.lock().await.error.notify(error).await; + self.events.lock().await.error.publish(error).await; } else { warn!( "Message unrecognised: {:?}, please open an issue on the chorus github", @@ -292,7 +293,7 @@ impl Gateway { let id = if message.id().is_some() { message.id().unwrap() } else { - event.notify(message).await; + event.publish(message).await; return; }; if let Some(to_update) = store.get(&id) { @@ -314,7 +315,7 @@ impl Gateway { } } )? - event.notify(message).await; + event.publish(message).await; } } },)* @@ -329,7 +330,7 @@ impl Gateway { return; } Ok(sessions) => { - self.events.lock().await.session.replace.notify( + self.events.lock().await.session.replace.publish( types::SessionsReplace {sessions} ).await; } @@ -446,7 +447,7 @@ impl Gateway { .await .session .reconnect - .notify(reconnect) + .publish(reconnect) .await; } GATEWAY_INVALID_SESSION => { @@ -471,7 +472,7 @@ impl Gateway { .await .session .invalid - .notify(invalid_session) + .publish(invalid_session) .await; } // Starts our heartbeat diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 3e96af0..fcbd125 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -82,56 +82,3 @@ pub type ObservableObject = dyn Send + Sync + Any; pub trait Updateable: 'static + Send + Sync { fn id(&self) -> Snowflake; } - -/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to -/// an Observable. The Observer is notified when the Observable's data changes. -/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. -/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing. -#[async_trait] -pub trait Observer: Sync + Send + std::fmt::Debug { - async fn update(&self, data: &T); -} - -/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a -/// change in the WebSocketEvent. GatewayEvents are observable. -#[derive(Default, Debug)] -pub struct GatewayEvent { - observers: Vec>>, -} - -impl GatewayEvent { - pub fn new() -> Self { - Self { - observers: Vec::new(), - } - } - - /// Returns true if the GatewayEvent is observed by at least one Observer. - pub fn is_observed(&self) -> bool { - !self.observers.is_empty() - } - - /// Subscribes an Observer to the GatewayEvent. - pub fn subscribe(&mut self, observable: Arc>) { - self.observers.push(observable); - } - - /// Unsubscribes an Observer from the GatewayEvent. - pub fn unsubscribe(&mut self, observable: &dyn Observer) { - // .retain()'s closure retains only those elements of the vector, which have a different - // pointer value than observable. - // The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same - // anddd there is no way to do that without using format - let to_remove = format!("{:?}", observable); - self.observers - .retain(|obs| format!("{:?}", obs) != to_remove); - } - - /// Notifies the observers of the GatewayEvent. - pub(crate) async fn notify(&self, new_event_data: T) { - for observer in &self.observers { - observer.update(&new_event_data).await; - } - } -} - diff --git a/src/voice/gateway/events.rs b/src/voice/gateway/events.rs index af043b3..90de2ce 100644 --- a/src/voice/gateway/events.rs +++ b/src/voice/gateway/events.rs @@ -14,15 +14,15 @@ use crate::{ #[derive(Default, Debug)] pub struct VoiceEvents { - pub voice_ready: GatewayEvent, - pub backend_version: GatewayEvent, - pub session_description: GatewayEvent, - pub session_update: GatewayEvent, - pub speaking: GatewayEvent, - pub ssrc_definition: GatewayEvent, - pub client_disconnect: GatewayEvent, - pub client_connect_flags: GatewayEvent, - pub client_connect_platform: GatewayEvent, - pub media_sink_wants: GatewayEvent, - pub error: GatewayEvent, + pub voice_ready: Publisher, + pub backend_version: Publisher, + pub session_description: Publisher, + pub session_update: Publisher, + pub speaking: Publisher, + pub ssrc_definition: Publisher, + pub client_disconnect: Publisher, + pub client_connect_flags: Publisher, + pub client_connect_platform: Publisher, + pub media_sink_wants: Publisher, + pub error: Publisher, } diff --git a/src/voice/gateway/gateway.rs b/src/voice/gateway/gateway.rs index 6ff2a37..f1bde91 100644 --- a/src/voice/gateway/gateway.rs +++ b/src/voice/gateway/gateway.rs @@ -160,7 +160,7 @@ impl VoiceGateway { /// (Called for every event in handle_message) async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>( data: &'a str, - event: &mut GatewayEvent, + event: &mut Publisher, ) -> Result<(), serde_json::Error> { let data_deserialize_result: Result = serde_json::from_str(data); @@ -168,7 +168,7 @@ impl VoiceGateway { return Err(data_deserialize_result.err().unwrap()); } - event.notify(data_deserialize_result.unwrap()).await; + event.publish(data_deserialize_result.unwrap()).await; Ok(()) } @@ -182,7 +182,7 @@ impl VoiceGateway { if let Some(error) = msg.error() { warn!("GW: Received error {:?}, connection will close..", error); self.close().await; - self.events.lock().await.error.notify(error).await; + self.events.lock().await.error.publish(error).await; } else { warn!( "Message unrecognised: {:?}, please open an issue on the chorus github", diff --git a/src/voice/udp/events.rs b/src/voice/udp/events.rs index d4917fe..dfa26e7 100644 --- a/src/voice/udp/events.rs +++ b/src/voice/udp/events.rs @@ -11,8 +11,8 @@ impl WebSocketEvent for Rtcp {} #[derive(Debug)] pub struct VoiceUDPEvents { - pub rtp: GatewayEvent, - pub rtcp: GatewayEvent, + pub rtp: Publisher, + pub rtcp: Publisher, } impl Default for VoiceUDPEvents { diff --git a/src/voice/udp/handler.rs b/src/voice/udp/handler.rs index c21709b..df2a57c 100644 --- a/src/voice/udp/handler.rs +++ b/src/voice/udp/handler.rs @@ -214,7 +214,7 @@ impl UdpHandler { .lock() .await .rtp - .notify(rtp_with_decrypted_data) + .publish(rtp_with_decrypted_data) .await; } Demuxed::Rtcp(rtcp) => { @@ -251,7 +251,7 @@ impl UdpHandler { } }; - self.events.lock().await.rtcp.notify(rtcp_data).await; + self.events.lock().await.rtcp.publish(rtcp_data).await; } Demuxed::FailedParse(e) => { trace!("VUDP: Failed to parse packet: {:?}", e); diff --git a/tests/gateway.rs b/tests/gateway.rs index 9991124..b16b3b2 100644 --- a/tests/gateway.rs +++ b/tests/gateway.rs @@ -14,6 +14,7 @@ use chorus::types::{ self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared, RoleCreateModifySchema, RoleObject, }; +use pubserve::Subscriber; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; #[cfg(target_arch = "wasm32")] @@ -30,7 +31,9 @@ use wasmtimer::tokio::sleep; async fn test_gateway_establish() { let bundle = common::setup().await; - let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap(); + let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()) + .await + .unwrap(); common::teardown(bundle).await } @@ -40,7 +43,7 @@ struct GatewayReadyObserver { } #[async_trait] -impl Observer for GatewayReadyObserver { +impl Subscriber for GatewayReadyObserver { async fn update(&self, _data: &GatewayReady) { self.channel.send(()).await.unwrap(); } @@ -52,7 +55,9 @@ impl Observer for GatewayReadyObserver { async fn test_gateway_authenticate() { let bundle = common::setup().await; - let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap(); + let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()) + .await + .unwrap(); let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);