Replace `Observer` and `GatewayEvent` with `pubserve` crate

This commit is contained in:
bitfl0wer 2024-07-13 14:28:22 +02:00
parent 484c69229d
commit 7554f90187
No known key found for this signature in database
GPG Key ID: 0ACD574FCF5226CF
11 changed files with 129 additions and 161 deletions

10
Cargo.lock generated
View File

@ -233,6 +233,7 @@ dependencies = [
"lazy_static",
"log",
"poem",
"pubserve",
"rand",
"regex",
"reqwest",
@ -1594,6 +1595,15 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "pubserve"
version = "1.1.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1781b2a51798c98a381e839e61bc5ce6426bd89bb9c3f9142de2086a80591cd"
dependencies = [
"async-trait",
]
[[package]]
name = "quote"
version = "1.0.36"

View File

@ -66,6 +66,7 @@ crypto_secretbox = { version = "0.1.1", optional = true }
rand = "0.8.5"
flate2 = { version = "1.0.30", optional = true }
webpki-roots = "0.26.3"
pubserve = { version = "1.1.0-alpha.1", features = ["async", "send"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
rustls = "0.21.10"

View File

@ -17,9 +17,9 @@ use async_trait::async_trait;
use chorus::gateway::{Gateway, GatewayOptions};
use chorus::{
self,
gateway::Observer,
types::{GatewayIdentifyPayload, GatewayReady},
};
use pubserve::Subscriber;
use std::{sync::Arc, time::Duration};
use tokio::{self};
@ -38,7 +38,7 @@ pub struct ExampleObserver {}
// The Observer trait can be implemented for a struct for a given websocketevent to handle observing it
// One struct can be an observer of multiple websocketevents, if needed
#[async_trait]
impl Observer<GatewayReady> for ExampleObserver {
impl Subscriber<GatewayReady> for ExampleObserver {
// After we subscribe to an event this function is called every time we receive it
async fn update(&self, _data: &GatewayReady) {
println!("Observed Ready!");
@ -56,7 +56,9 @@ async fn main() {
let options = GatewayOptions::default();
// Initiate the gateway connection
let gateway = Gateway::spawn(gateway_websocket_url, options).await.unwrap();
let gateway = Gateway::spawn(gateway_websocket_url, options)
.await
.unwrap();
// Create an instance of our observer
let observer = ExampleObserver {};

View File

@ -2,6 +2,8 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use pubserve::Publisher;
use super::*;
use crate::types;
@ -23,144 +25,144 @@ pub struct Events {
pub call: Call,
pub voice: Voice,
pub webhooks: Webhooks,
pub gateway_identify_payload: GatewayEvent<types::GatewayIdentifyPayload>,
pub gateway_resume: GatewayEvent<types::GatewayResume>,
pub error: GatewayEvent<GatewayError>,
pub gateway_identify_payload: Publisher<types::GatewayIdentifyPayload>,
pub gateway_resume: Publisher<types::GatewayResume>,
pub error: Publisher<GatewayError>,
}
#[derive(Default, Debug)]
pub struct Application {
pub command_permissions_update: GatewayEvent<types::ApplicationCommandPermissionsUpdate>,
pub command_permissions_update: Publisher<types::ApplicationCommandPermissionsUpdate>,
}
#[derive(Default, Debug)]
pub struct AutoModeration {
pub rule_create: GatewayEvent<types::AutoModerationRuleCreate>,
pub rule_update: GatewayEvent<types::AutoModerationRuleUpdate>,
pub rule_delete: GatewayEvent<types::AutoModerationRuleDelete>,
pub action_execution: GatewayEvent<types::AutoModerationActionExecution>,
pub rule_create: Publisher<types::AutoModerationRuleCreate>,
pub rule_update: Publisher<types::AutoModerationRuleUpdate>,
pub rule_delete: Publisher<types::AutoModerationRuleDelete>,
pub action_execution: Publisher<types::AutoModerationActionExecution>,
}
#[derive(Default, Debug)]
pub struct Session {
pub ready: GatewayEvent<types::GatewayReady>,
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
pub replace: GatewayEvent<types::SessionsReplace>,
pub reconnect: GatewayEvent<types::GatewayReconnect>,
pub invalid: GatewayEvent<types::GatewayInvalidSession>,
pub ready: Publisher<types::GatewayReady>,
pub ready_supplemental: Publisher<types::GatewayReadySupplemental>,
pub replace: Publisher<types::SessionsReplace>,
pub reconnect: Publisher<types::GatewayReconnect>,
pub invalid: Publisher<types::GatewayInvalidSession>,
}
#[derive(Default, Debug)]
pub struct StageInstance {
pub create: GatewayEvent<types::StageInstanceCreate>,
pub update: GatewayEvent<types::StageInstanceUpdate>,
pub delete: GatewayEvent<types::StageInstanceDelete>,
pub create: Publisher<types::StageInstanceCreate>,
pub update: Publisher<types::StageInstanceUpdate>,
pub delete: Publisher<types::StageInstanceDelete>,
}
#[derive(Default, Debug)]
pub struct Message {
pub create: GatewayEvent<types::MessageCreate>,
pub update: GatewayEvent<types::MessageUpdate>,
pub delete: GatewayEvent<types::MessageDelete>,
pub delete_bulk: GatewayEvent<types::MessageDeleteBulk>,
pub reaction_add: GatewayEvent<types::MessageReactionAdd>,
pub reaction_remove: GatewayEvent<types::MessageReactionRemove>,
pub reaction_remove_all: GatewayEvent<types::MessageReactionRemoveAll>,
pub reaction_remove_emoji: GatewayEvent<types::MessageReactionRemoveEmoji>,
pub ack: GatewayEvent<types::MessageACK>,
pub create: Publisher<types::MessageCreate>,
pub update: Publisher<types::MessageUpdate>,
pub delete: Publisher<types::MessageDelete>,
pub delete_bulk: Publisher<types::MessageDeleteBulk>,
pub reaction_add: Publisher<types::MessageReactionAdd>,
pub reaction_remove: Publisher<types::MessageReactionRemove>,
pub reaction_remove_all: Publisher<types::MessageReactionRemoveAll>,
pub reaction_remove_emoji: Publisher<types::MessageReactionRemoveEmoji>,
pub ack: Publisher<types::MessageACK>,
}
#[derive(Default, Debug)]
pub struct User {
pub update: GatewayEvent<types::UserUpdate>,
pub guild_settings_update: GatewayEvent<types::UserGuildSettingsUpdate>,
pub presence_update: GatewayEvent<types::PresenceUpdate>,
pub typing_start: GatewayEvent<types::TypingStartEvent>,
pub update: Publisher<types::UserUpdate>,
pub guild_settings_update: Publisher<types::UserGuildSettingsUpdate>,
pub presence_update: Publisher<types::PresenceUpdate>,
pub typing_start: Publisher<types::TypingStartEvent>,
}
#[derive(Default, Debug)]
pub struct Relationship {
pub add: GatewayEvent<types::RelationshipAdd>,
pub remove: GatewayEvent<types::RelationshipRemove>,
pub add: Publisher<types::RelationshipAdd>,
pub remove: Publisher<types::RelationshipRemove>,
}
#[derive(Default, Debug)]
pub struct Channel {
pub create: GatewayEvent<types::ChannelCreate>,
pub update: GatewayEvent<types::ChannelUpdate>,
pub unread_update: GatewayEvent<types::ChannelUnreadUpdate>,
pub delete: GatewayEvent<types::ChannelDelete>,
pub pins_update: GatewayEvent<types::ChannelPinsUpdate>,
pub create: Publisher<types::ChannelCreate>,
pub update: Publisher<types::ChannelUpdate>,
pub unread_update: Publisher<types::ChannelUnreadUpdate>,
pub delete: Publisher<types::ChannelDelete>,
pub pins_update: Publisher<types::ChannelPinsUpdate>,
}
#[derive(Default, Debug)]
pub struct Thread {
pub create: GatewayEvent<types::ThreadCreate>,
pub update: GatewayEvent<types::ThreadUpdate>,
pub delete: GatewayEvent<types::ThreadDelete>,
pub list_sync: GatewayEvent<types::ThreadListSync>,
pub member_update: GatewayEvent<types::ThreadMemberUpdate>,
pub members_update: GatewayEvent<types::ThreadMembersUpdate>,
pub create: Publisher<types::ThreadCreate>,
pub update: Publisher<types::ThreadUpdate>,
pub delete: Publisher<types::ThreadDelete>,
pub list_sync: Publisher<types::ThreadListSync>,
pub member_update: Publisher<types::ThreadMemberUpdate>,
pub members_update: Publisher<types::ThreadMembersUpdate>,
}
#[derive(Default, Debug)]
pub struct Guild {
pub create: GatewayEvent<types::GuildCreate>,
pub update: GatewayEvent<types::GuildUpdate>,
pub delete: GatewayEvent<types::GuildDelete>,
pub audit_log_entry_create: GatewayEvent<types::GuildAuditLogEntryCreate>,
pub ban_add: GatewayEvent<types::GuildBanAdd>,
pub ban_remove: GatewayEvent<types::GuildBanRemove>,
pub emojis_update: GatewayEvent<types::GuildEmojisUpdate>,
pub stickers_update: GatewayEvent<types::GuildStickersUpdate>,
pub integrations_update: GatewayEvent<types::GuildIntegrationsUpdate>,
pub member_add: GatewayEvent<types::GuildMemberAdd>,
pub member_remove: GatewayEvent<types::GuildMemberRemove>,
pub member_update: GatewayEvent<types::GuildMemberUpdate>,
pub members_chunk: GatewayEvent<types::GuildMembersChunk>,
pub role_create: GatewayEvent<types::GuildRoleCreate>,
pub role_update: GatewayEvent<types::GuildRoleUpdate>,
pub role_delete: GatewayEvent<types::GuildRoleDelete>,
pub role_scheduled_event_create: GatewayEvent<types::GuildScheduledEventCreate>,
pub role_scheduled_event_update: GatewayEvent<types::GuildScheduledEventUpdate>,
pub role_scheduled_event_delete: GatewayEvent<types::GuildScheduledEventDelete>,
pub role_scheduled_event_user_add: GatewayEvent<types::GuildScheduledEventUserAdd>,
pub role_scheduled_event_user_remove: GatewayEvent<types::GuildScheduledEventUserRemove>,
pub passive_update_v1: GatewayEvent<types::PassiveUpdateV1>,
pub create: Publisher<types::GuildCreate>,
pub update: Publisher<types::GuildUpdate>,
pub delete: Publisher<types::GuildDelete>,
pub audit_log_entry_create: Publisher<types::GuildAuditLogEntryCreate>,
pub ban_add: Publisher<types::GuildBanAdd>,
pub ban_remove: Publisher<types::GuildBanRemove>,
pub emojis_update: Publisher<types::GuildEmojisUpdate>,
pub stickers_update: Publisher<types::GuildStickersUpdate>,
pub integrations_update: Publisher<types::GuildIntegrationsUpdate>,
pub member_add: Publisher<types::GuildMemberAdd>,
pub member_remove: Publisher<types::GuildMemberRemove>,
pub member_update: Publisher<types::GuildMemberUpdate>,
pub members_chunk: Publisher<types::GuildMembersChunk>,
pub role_create: Publisher<types::GuildRoleCreate>,
pub role_update: Publisher<types::GuildRoleUpdate>,
pub role_delete: Publisher<types::GuildRoleDelete>,
pub role_scheduled_event_create: Publisher<types::GuildScheduledEventCreate>,
pub role_scheduled_event_update: Publisher<types::GuildScheduledEventUpdate>,
pub role_scheduled_event_delete: Publisher<types::GuildScheduledEventDelete>,
pub role_scheduled_event_user_add: Publisher<types::GuildScheduledEventUserAdd>,
pub role_scheduled_event_user_remove: Publisher<types::GuildScheduledEventUserRemove>,
pub passive_update_v1: Publisher<types::PassiveUpdateV1>,
}
#[derive(Default, Debug)]
pub struct Invite {
pub create: GatewayEvent<types::InviteCreate>,
pub delete: GatewayEvent<types::InviteDelete>,
pub create: Publisher<types::InviteCreate>,
pub delete: Publisher<types::InviteDelete>,
}
#[derive(Default, Debug)]
pub struct Integration {
pub create: GatewayEvent<types::IntegrationCreate>,
pub update: GatewayEvent<types::IntegrationUpdate>,
pub delete: GatewayEvent<types::IntegrationDelete>,
pub create: Publisher<types::IntegrationCreate>,
pub update: Publisher<types::IntegrationUpdate>,
pub delete: Publisher<types::IntegrationDelete>,
}
#[derive(Default, Debug)]
pub struct Interaction {
pub create: GatewayEvent<types::InteractionCreate>,
pub create: Publisher<types::InteractionCreate>,
}
#[derive(Default, Debug)]
pub struct Call {
pub create: GatewayEvent<types::CallCreate>,
pub update: GatewayEvent<types::CallUpdate>,
pub delete: GatewayEvent<types::CallDelete>,
pub create: Publisher<types::CallCreate>,
pub update: Publisher<types::CallUpdate>,
pub delete: Publisher<types::CallDelete>,
}
#[derive(Default, Debug)]
pub struct Voice {
pub state_update: GatewayEvent<types::VoiceStateUpdate>,
pub server_update: GatewayEvent<types::VoiceServerUpdate>,
pub state_update: Publisher<types::VoiceStateUpdate>,
pub server_update: Publisher<types::VoiceServerUpdate>,
}
#[derive(Default, Debug)]
pub struct Webhooks {
pub update: GatewayEvent<types::WebhooksUpdate>,
pub update: Publisher<types::WebhooksUpdate>,
}

View File

@ -7,6 +7,7 @@ use std::time::Duration;
use flate2::Decompress;
use futures_util::{SinkExt, StreamExt};
use log::*;
use pubserve::Publisher;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
@ -197,7 +198,7 @@ impl Gateway {
#[allow(dead_code)] // TODO: Remove this allow annotation
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
data: &'a str,
event: &mut GatewayEvent<T>,
event: &mut Publisher<T>,
) -> Result<(), serde_json::Error> {
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
@ -205,7 +206,7 @@ impl Gateway {
return Err(data_deserialize_result.err().unwrap());
}
event.notify(data_deserialize_result.unwrap()).await;
event.publish(data_deserialize_result.unwrap()).await;
Ok(())
}
@ -253,7 +254,7 @@ impl Gateway {
if let Some(error) = msg.error() {
warn!("GW: Received error {:?}, connection will close..", error);
self.close().await;
self.events.lock().await.error.notify(error).await;
self.events.lock().await.error.publish(error).await;
} else {
warn!(
"Message unrecognised: {:?}, please open an issue on the chorus github",
@ -292,7 +293,7 @@ impl Gateway {
let id = if message.id().is_some() {
message.id().unwrap()
} else {
event.notify(message).await;
event.publish(message).await;
return;
};
if let Some(to_update) = store.get(&id) {
@ -314,7 +315,7 @@ impl Gateway {
}
}
)?
event.notify(message).await;
event.publish(message).await;
}
}
},)*
@ -329,7 +330,7 @@ impl Gateway {
return;
}
Ok(sessions) => {
self.events.lock().await.session.replace.notify(
self.events.lock().await.session.replace.publish(
types::SessionsReplace {sessions}
).await;
}
@ -446,7 +447,7 @@ impl Gateway {
.await
.session
.reconnect
.notify(reconnect)
.publish(reconnect)
.await;
}
GATEWAY_INVALID_SESSION => {
@ -471,7 +472,7 @@ impl Gateway {
.await
.session
.invalid
.notify(invalid_session)
.publish(invalid_session)
.await;
}
// Starts our heartbeat

View File

@ -82,56 +82,3 @@ pub type ObservableObject = dyn Send + Sync + Any;
pub trait Updateable: 'static + Send + Sync {
fn id(&self) -> Snowflake;
}
/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
/// an Observable. The Observer is notified when the Observable's data changes.
/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
#[async_trait]
pub trait Observer<T>: Sync + Send + std::fmt::Debug {
async fn update(&self, data: &T);
}
/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
/// change in the WebSocketEvent. GatewayEvents are observable.
#[derive(Default, Debug)]
pub struct GatewayEvent<T: WebSocketEvent> {
observers: Vec<Arc<dyn Observer<T>>>,
}
impl<T: WebSocketEvent> GatewayEvent<T> {
pub fn new() -> Self {
Self {
observers: Vec::new(),
}
}
/// Returns true if the GatewayEvent is observed by at least one Observer.
pub fn is_observed(&self) -> bool {
!self.observers.is_empty()
}
/// Subscribes an Observer to the GatewayEvent.
pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
self.observers.push(observable);
}
/// Unsubscribes an Observer from the GatewayEvent.
pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
// .retain()'s closure retains only those elements of the vector, which have a different
// pointer value than observable.
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
// anddd there is no way to do that without using format
let to_remove = format!("{:?}", observable);
self.observers
.retain(|obs| format!("{:?}", obs) != to_remove);
}
/// Notifies the observers of the GatewayEvent.
pub(crate) async fn notify(&self, new_event_data: T) {
for observer in &self.observers {
observer.update(&new_event_data).await;
}
}
}

View File

@ -14,15 +14,15 @@ use crate::{
#[derive(Default, Debug)]
pub struct VoiceEvents {
pub voice_ready: GatewayEvent<VoiceReady>,
pub backend_version: GatewayEvent<VoiceBackendVersion>,
pub session_description: GatewayEvent<SessionDescription>,
pub session_update: GatewayEvent<SessionUpdate>,
pub speaking: GatewayEvent<Speaking>,
pub ssrc_definition: GatewayEvent<SsrcDefinition>,
pub client_disconnect: GatewayEvent<VoiceClientDisconnection>,
pub client_connect_flags: GatewayEvent<VoiceClientConnectFlags>,
pub client_connect_platform: GatewayEvent<VoiceClientConnectPlatform>,
pub media_sink_wants: GatewayEvent<VoiceMediaSinkWants>,
pub error: GatewayEvent<VoiceGatewayError>,
pub voice_ready: Publisher<VoiceReady>,
pub backend_version: Publisher<VoiceBackendVersion>,
pub session_description: Publisher<SessionDescription>,
pub session_update: Publisher<SessionUpdate>,
pub speaking: Publisher<Speaking>,
pub ssrc_definition: Publisher<SsrcDefinition>,
pub client_disconnect: Publisher<VoiceClientDisconnection>,
pub client_connect_flags: Publisher<VoiceClientConnectFlags>,
pub client_connect_platform: Publisher<VoiceClientConnectPlatform>,
pub media_sink_wants: Publisher<VoiceMediaSinkWants>,
pub error: Publisher<VoiceGatewayError>,
}

View File

@ -160,7 +160,7 @@ impl VoiceGateway {
/// (Called for every event in handle_message)
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
data: &'a str,
event: &mut GatewayEvent<T>,
event: &mut Publisher<T>,
) -> Result<(), serde_json::Error> {
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
@ -168,7 +168,7 @@ impl VoiceGateway {
return Err(data_deserialize_result.err().unwrap());
}
event.notify(data_deserialize_result.unwrap()).await;
event.publish(data_deserialize_result.unwrap()).await;
Ok(())
}
@ -182,7 +182,7 @@ impl VoiceGateway {
if let Some(error) = msg.error() {
warn!("GW: Received error {:?}, connection will close..", error);
self.close().await;
self.events.lock().await.error.notify(error).await;
self.events.lock().await.error.publish(error).await;
} else {
warn!(
"Message unrecognised: {:?}, please open an issue on the chorus github",

View File

@ -11,8 +11,8 @@ impl WebSocketEvent for Rtcp {}
#[derive(Debug)]
pub struct VoiceUDPEvents {
pub rtp: GatewayEvent<Rtp>,
pub rtcp: GatewayEvent<Rtcp>,
pub rtp: Publisher<Rtp>,
pub rtcp: Publisher<Rtcp>,
}
impl Default for VoiceUDPEvents {

View File

@ -214,7 +214,7 @@ impl UdpHandler {
.lock()
.await
.rtp
.notify(rtp_with_decrypted_data)
.publish(rtp_with_decrypted_data)
.await;
}
Demuxed::Rtcp(rtcp) => {
@ -251,7 +251,7 @@ impl UdpHandler {
}
};
self.events.lock().await.rtcp.notify(rtcp_data).await;
self.events.lock().await.rtcp.publish(rtcp_data).await;
}
Demuxed::FailedParse(e) => {
trace!("VUDP: Failed to parse packet: {:?}", e);

View File

@ -14,6 +14,7 @@ use chorus::types::{
self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared,
RoleCreateModifySchema, RoleObject,
};
use pubserve::Subscriber;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;
#[cfg(target_arch = "wasm32")]
@ -30,7 +31,9 @@ use wasmtimer::tokio::sleep;
async fn test_gateway_establish() {
let bundle = common::setup().await;
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
.await
.unwrap();
common::teardown(bundle).await
}
@ -40,7 +43,7 @@ struct GatewayReadyObserver {
}
#[async_trait]
impl Observer<GatewayReady> for GatewayReadyObserver {
impl Subscriber<GatewayReady> for GatewayReadyObserver {
async fn update(&self, _data: &GatewayReady) {
self.channel.send(()).await.unwrap();
}
@ -52,7 +55,9 @@ impl Observer<GatewayReady> for GatewayReadyObserver {
async fn test_gateway_authenticate() {
let bundle = common::setup().await;
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
.await
.unwrap();
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);