From 1df01512d754e972341ecf6556b16712059c2fa8 Mon Sep 17 00:00:00 2001 From: Vincent Junge Date: Tue, 20 Jun 2023 14:42:50 +0200 Subject: [PATCH] simplify GatewayEvent --- examples/gateway_observers.rs | 11 +-- src/gateway.rs | 171 ++++++++++++---------------------- src/types/events/resume.rs | 2 +- tests/message.rs | 17 ++++ 4 files changed, 80 insertions(+), 121 deletions(-) diff --git a/examples/gateway_observers.rs b/examples/gateway_observers.rs index 6e71751..d8762e0 100644 --- a/examples/gateway_observers.rs +++ b/examples/gateway_observers.rs @@ -4,7 +4,7 @@ use chorus::{ types::{GatewayIdentifyPayload, GatewayReady}, }; use std::{sync::Arc, time::Duration}; -use tokio::{self, sync::Mutex, time::sleep}; +use tokio::{self, time::sleep}; // This example creates a simple gateway connection and a basic observer struct @@ -17,7 +17,7 @@ pub struct ExampleObserver {} // One struct can be an observer of multiple websocketevents, if needed impl Observer for ExampleObserver { // After we subscribe to an event this function is called every time we receive it - fn update(&mut self, _data: &GatewayReady) { + fn update(&self, _data: &GatewayReady) { println!("Observed Ready!"); } } @@ -33,8 +33,8 @@ async fn main() { // Create an instance of our observer let observer = ExampleObserver {}; - // Because observers have to reside in between the main and gateway thread, (they have to be shared between both) we need to put them in an Arc - let shared_observer = Arc::new(Mutex::new(observer)); + // Share ownership of the observer with the gateway + let shared_observer = Arc::new(observer); // Subscribe our observer to the Ready event on this gateway // From now on observer.update(data) will be called every time we receive the Ready event @@ -44,8 +44,7 @@ async fn main() { .await .session .ready - .subscribe(shared_observer) - .unwrap(); + .subscribe(shared_observer); // Authenticate so we will receive any events let token = "SecretToken".to_string(); diff --git a/src/gateway.rs b/src/gateway.rs index 5cececa..7ead0ce 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -3,6 +3,7 @@ use crate::errors::ObserverError; use crate::gateway::events::Events; use crate::types; use crate::types::WebSocketEvent; +use std::any::Any; use std::sync::Arc; use futures_util::stream::SplitSink; @@ -72,11 +73,9 @@ const GATEWAY_LAZY_REQUEST: u8 = 14; /// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; +/// Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError]. +/// This struct is used internally when handling messages. #[derive(Clone, Debug)] -/** -Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError]. -This struct is used internally when handling messages. -*/ pub struct GatewayMessage { /// The message we received from the server message: tokio_tungstenite::tungstenite::Message, @@ -148,13 +147,11 @@ impl GatewayMessage { } } +/// Represents a handle to a Gateway connection. A Gateway connection will create observable +/// [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently +/// implemented [Types] with the trait [`WebSocketEvent`] +/// Using this handle you can also send Gateway Events directly. #[derive(Debug)] -/** -Represents a handle to a Gateway connection. A Gateway connection will create observable -[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently -implemented [Types] with the trait [`WebSocketEvent`] -Using this handle you can also send Gateway Events directly. - */ pub struct GatewayHandle { pub url: String, pub events: Arc>, @@ -395,7 +392,7 @@ impl Gateway { return Err(data_deserialize_result.err().unwrap()); } - event.update_data(data_deserialize_result.unwrap()).await; + event.notify(data_deserialize_result.unwrap()).await; Ok(()) } @@ -1370,13 +1367,7 @@ impl Gateway { sessions: result.unwrap(), }; - self.events - .lock() - .await - .session - .replace - .update_data(data) - .await; + self.events.lock().await.session.replace.notify(data).await; } "USER_UPDATE" => { let event = &mut self.events.lock().await.user.update; @@ -1532,9 +1523,7 @@ impl Gateway { } } -/** -Handles sending heartbeats to the gateway in another thread - */ +/// Handles sending heartbeats to the gateway in another thread struct HeartbeatHandler { /// The heartbeat interval in milliseconds pub heartbeat_interval: u128, @@ -1669,10 +1658,8 @@ impl HeartbeatHandler { } } -/** -Used for communications between the heartbeat and gateway thread. -Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server -*/ +/// Used for communications between the heartbeat and gateway thread. +/// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server #[derive(Clone, Copy, Debug)] struct HeartbeatThreadCommunication { /// The opcode for the communication we received, if relevant @@ -1681,89 +1668,47 @@ struct HeartbeatThreadCommunication { sequence_number: Option, } -/** -Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to -an Observable. The Observer is notified when the Observable's data changes. -In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. - */ -pub trait Observer: std::fmt::Debug { - fn update(&mut self, data: &T); +/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to +/// an Observable. The Observer is notified when the Observable's data changes. +/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. +/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing. +pub trait Observer: Sync + Send + std::fmt::Debug { + fn update(&self, data: &T); } -/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a -change in the WebSocketEvent. GatewayEvents are observable. - */ +/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a +/// change in the WebSocketEvent. GatewayEvents are observable. #[derive(Default, Debug)] -pub struct GatewayEvent { - observers: Vec + Sync + Send>>>, - pub event_data: T, - pub is_observed: bool, +pub struct GatewayEvent { + observers: Vec>>, } -impl GatewayEvent { - fn new(event_data: T) -> Self { - Self { - is_observed: false, - observers: Vec::new(), - event_data, - } - } - - /** - Returns true if the GatewayEvent is observed by at least one Observer. - */ +impl GatewayEvent { + /// Returns true if the GatewayEvent is observed by at least one Observer. pub fn is_observed(&self) -> bool { - self.is_observed + !self.observers.is_empty() } - /** - Subscribes an Observer to the GatewayEvent. Returns an error if the GatewayEvent is already - observed. - # Errors - Returns an error if the GatewayEvent is already observed. - Error type: [`ObserverError::AlreadySubscribedError`] - */ - pub fn subscribe( - &mut self, - observable: Arc + Sync + Send>>, - ) -> Result<(), ObserverError> { - if self.is_observed { - return Err(ObserverError::AlreadySubscribedError); - } - self.is_observed = true; + /// Subscribes an Observer to the GatewayEvent. + pub fn subscribe(&mut self, observable: Arc>) { self.observers.push(observable); - Ok(()) } - /** - Unsubscribes an Observer from the GatewayEvent. - */ - pub fn unsubscribe(&mut self, observable: Arc + Sync + Send>>) { + /// Unsubscribes an Observer from the GatewayEvent. + pub fn unsubscribe(&mut self, observable: &dyn Observer) { // .retain()'s closure retains only those elements of the vector, which have a different // pointer value than observable. // The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same // anddd there is no way to do that without using format + let to_remove = format!("{:?}", observable); self.observers - .retain(|obs| format!("{:?}", obs) != format!("{:?}", &observable)); - self.is_observed = !self.observers.is_empty(); + .retain(|obs| format!("{:?}", obs) != to_remove); } - /** - Updates the GatewayEvent's data and notifies the observers. - */ - async fn update_data(&mut self, new_event_data: T) { - self.event_data = new_event_data; - self.notify().await; - } - - /** - Notifies the observers of the GatewayEvent. - */ - async fn notify(&self) { + /// Notifies the observers of the GatewayEvent. + async fn notify(&self, new_event_data: T) { for observer in &self.observers { - let mut observer_lock = observer.lock().await; - observer_lock.update(&self.event_data); - drop(observer_lock); + observer.update(&new_event_data); } } } @@ -1932,23 +1877,23 @@ mod events { #[cfg(test)] mod example { use super::*; + use std::sync::atomic::{AtomicI32, Ordering::Relaxed}; #[derive(Debug)] - struct Consumer; + struct Consumer { + name: String, + events_received: AtomicI32, + } impl Observer for Consumer { - fn update(&mut self, data: &types::GatewayResume) { - println!("{}", data.token) + fn update(&self, _data: &types::GatewayResume) { + self.events_received.fetch_add(1, Relaxed); } } #[tokio::test] async fn test_observer_behavior() { - let mut event = GatewayEvent::new(types::GatewayResume { - token: "start".to_string(), - session_id: "start".to_string(), - seq: "start".to_string(), - }); + let mut event = GatewayEvent::default(); let new_data = types::GatewayResume { token: "token_3276ha37am3".to_string(), @@ -1956,25 +1901,23 @@ mod example { seq: "3".to_string(), }; - let consumer = Consumer; - let arc_mut_consumer = Arc::new(Mutex::new(consumer)); + let consumer = Arc::new(Consumer { + name: "first".into(), + events_received: 0.into(), + }); + event.subscribe(consumer.clone()); - event.subscribe(arc_mut_consumer.clone()).unwrap(); + let second_consumer = Arc::new(Consumer { + name: "second".into(), + events_received: 0.into(), + }); + event.subscribe(second_consumer.clone()); - event.notify().await; + event.notify(new_data.clone()).await; + event.unsubscribe(&*consumer); + event.notify(new_data).await; - event.update_data(new_data).await; - - let second_consumer = Consumer; - let arc_mut_second_consumer = Arc::new(Mutex::new(second_consumer)); - - match event.subscribe(arc_mut_second_consumer.clone()).err() { - None => panic!(), - Some(err) => println!("You cannot subscribe twice: {}", err), - } - - event.unsubscribe(arc_mut_consumer.clone()); - - event.subscribe(arc_mut_second_consumer).unwrap(); + assert_eq!(consumer.events_received.load(Relaxed), 1); + assert_eq!(second_consumer.events_received.load(Relaxed), 2); } } diff --git a/src/types/events/resume.rs b/src/types/events/resume.rs index 362de98..45d2235 100644 --- a/src/types/events/resume.rs +++ b/src/types/events/resume.rs @@ -1,7 +1,7 @@ use crate::types::events::WebSocketEvent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize, Serialize, Default)] +#[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct GatewayResume { pub token: String, pub session_id: String, diff --git a/tests/message.rs b/tests/message.rs index e92c35c..7f1da9d 100644 --- a/tests/message.rs +++ b/tests/message.rs @@ -64,3 +64,20 @@ async fn send_message_attachment() { .unwrap(); common::teardown(bundle).await } + +#[tokio::test] +async fn read_messages() { + let mut bundle = common::setup().await; + + // First create some messages to read + let mut message = types::MessageSendSchema { + content: Some("A Message!".to_string()), + ..Default::default() + }; + let _ = bundle + .user + .send_message(&mut message, bundle.channel.id.to_string(), None) + .await + .unwrap(); + common::teardown(bundle).await +}