simplify GatewayEvent

This commit is contained in:
Vincent Junge 2023-06-20 14:42:50 +02:00
parent aae478543d
commit 1df01512d7
4 changed files with 80 additions and 121 deletions

View File

@ -4,7 +4,7 @@ use chorus::{
types::{GatewayIdentifyPayload, GatewayReady}, types::{GatewayIdentifyPayload, GatewayReady},
}; };
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::{self, sync::Mutex, time::sleep}; use tokio::{self, time::sleep};
// This example creates a simple gateway connection and a basic observer struct // This example creates a simple gateway connection and a basic observer struct
@ -17,7 +17,7 @@ pub struct ExampleObserver {}
// One struct can be an observer of multiple websocketevents, if needed // One struct can be an observer of multiple websocketevents, if needed
impl Observer<GatewayReady> for ExampleObserver { impl Observer<GatewayReady> for ExampleObserver {
// After we subscribe to an event this function is called every time we receive it // After we subscribe to an event this function is called every time we receive it
fn update(&mut self, _data: &GatewayReady) { fn update(&self, _data: &GatewayReady) {
println!("Observed Ready!"); println!("Observed Ready!");
} }
} }
@ -33,8 +33,8 @@ async fn main() {
// Create an instance of our observer // Create an instance of our observer
let observer = ExampleObserver {}; let observer = ExampleObserver {};
// Because observers have to reside in between the main and gateway thread, (they have to be shared between both) we need to put them in an Arc<Mutex> // Share ownership of the observer with the gateway
let shared_observer = Arc::new(Mutex::new(observer)); let shared_observer = Arc::new(observer);
// Subscribe our observer to the Ready event on this gateway // Subscribe our observer to the Ready event on this gateway
// From now on observer.update(data) will be called every time we receive the Ready event // From now on observer.update(data) will be called every time we receive the Ready event
@ -44,8 +44,7 @@ async fn main() {
.await .await
.session .session
.ready .ready
.subscribe(shared_observer) .subscribe(shared_observer);
.unwrap();
// Authenticate so we will receive any events // Authenticate so we will receive any events
let token = "SecretToken".to_string(); let token = "SecretToken".to_string();

View File

@ -3,6 +3,7 @@ use crate::errors::ObserverError;
use crate::gateway::events::Events; use crate::gateway::events::Events;
use crate::types; use crate::types;
use crate::types::WebSocketEvent; use crate::types::WebSocketEvent;
use std::any::Any;
use std::sync::Arc; use std::sync::Arc;
use futures_util::stream::SplitSink; use futures_util::stream::SplitSink;
@ -72,11 +73,9 @@ const GATEWAY_LAZY_REQUEST: u8 = 14;
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms /// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
const HEARTBEAT_ACK_TIMEOUT: u128 = 2000; const HEARTBEAT_ACK_TIMEOUT: u128 = 2000;
/// Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
/// This struct is used internally when handling messages.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/**
Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
This struct is used internally when handling messages.
*/
pub struct GatewayMessage { pub struct GatewayMessage {
/// The message we received from the server /// The message we received from the server
message: tokio_tungstenite::tungstenite::Message, message: tokio_tungstenite::tungstenite::Message,
@ -148,13 +147,11 @@ impl GatewayMessage {
} }
} }
/// Represents a handle to a Gateway connection. A Gateway connection will create observable
/// [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
/// implemented [Types] with the trait [`WebSocketEvent`]
/// Using this handle you can also send Gateway Events directly.
#[derive(Debug)] #[derive(Debug)]
/**
Represents a handle to a Gateway connection. A Gateway connection will create observable
[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
implemented [Types] with the trait [`WebSocketEvent`]
Using this handle you can also send Gateway Events directly.
*/
pub struct GatewayHandle { pub struct GatewayHandle {
pub url: String, pub url: String,
pub events: Arc<Mutex<Events>>, pub events: Arc<Mutex<Events>>,
@ -395,7 +392,7 @@ impl Gateway {
return Err(data_deserialize_result.err().unwrap()); return Err(data_deserialize_result.err().unwrap());
} }
event.update_data(data_deserialize_result.unwrap()).await; event.notify(data_deserialize_result.unwrap()).await;
Ok(()) Ok(())
} }
@ -1370,13 +1367,7 @@ impl Gateway {
sessions: result.unwrap(), sessions: result.unwrap(),
}; };
self.events self.events.lock().await.session.replace.notify(data).await;
.lock()
.await
.session
.replace
.update_data(data)
.await;
} }
"USER_UPDATE" => { "USER_UPDATE" => {
let event = &mut self.events.lock().await.user.update; let event = &mut self.events.lock().await.user.update;
@ -1532,9 +1523,7 @@ impl Gateway {
} }
} }
/** /// Handles sending heartbeats to the gateway in another thread
Handles sending heartbeats to the gateway in another thread
*/
struct HeartbeatHandler { struct HeartbeatHandler {
/// The heartbeat interval in milliseconds /// The heartbeat interval in milliseconds
pub heartbeat_interval: u128, pub heartbeat_interval: u128,
@ -1669,10 +1658,8 @@ impl HeartbeatHandler {
} }
} }
/** /// Used for communications between the heartbeat and gateway thread.
Used for communications between the heartbeat and gateway thread. /// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
*/
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
struct HeartbeatThreadCommunication { struct HeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant /// The opcode for the communication we received, if relevant
@ -1681,89 +1668,47 @@ struct HeartbeatThreadCommunication {
sequence_number: Option<u64>, sequence_number: Option<u64>,
} }
/** /// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to /// an Observable. The Observer is notified when the Observable's data changes.
an Observable. The Observer is notified when the Observable's data changes. /// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. /// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
*/ pub trait Observer<T>: Sync + Send + std::fmt::Debug {
pub trait Observer<T: types::WebSocketEvent>: std::fmt::Debug { fn update(&self, data: &T);
fn update(&mut self, data: &T);
} }
/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a /// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
change in the WebSocketEvent. GatewayEvents are observable. /// change in the WebSocketEvent. GatewayEvents are observable.
*/
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct GatewayEvent<T: types::WebSocketEvent> { pub struct GatewayEvent<T: WebSocketEvent> {
observers: Vec<Arc<Mutex<dyn Observer<T> + Sync + Send>>>, observers: Vec<Arc<dyn Observer<T>>>,
pub event_data: T,
pub is_observed: bool,
} }
impl<T: types::WebSocketEvent> GatewayEvent<T> { impl<T: WebSocketEvent> GatewayEvent<T> {
fn new(event_data: T) -> Self { /// Returns true if the GatewayEvent is observed by at least one Observer.
Self {
is_observed: false,
observers: Vec::new(),
event_data,
}
}
/**
Returns true if the GatewayEvent is observed by at least one Observer.
*/
pub fn is_observed(&self) -> bool { pub fn is_observed(&self) -> bool {
self.is_observed !self.observers.is_empty()
} }
/** /// Subscribes an Observer to the GatewayEvent.
Subscribes an Observer to the GatewayEvent. Returns an error if the GatewayEvent is already pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
observed.
# Errors
Returns an error if the GatewayEvent is already observed.
Error type: [`ObserverError::AlreadySubscribedError`]
*/
pub fn subscribe(
&mut self,
observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>,
) -> Result<(), ObserverError> {
if self.is_observed {
return Err(ObserverError::AlreadySubscribedError);
}
self.is_observed = true;
self.observers.push(observable); self.observers.push(observable);
Ok(())
} }
/** /// Unsubscribes an Observer from the GatewayEvent.
Unsubscribes an Observer from the GatewayEvent. pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
*/
pub fn unsubscribe(&mut self, observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>) {
// .retain()'s closure retains only those elements of the vector, which have a different // .retain()'s closure retains only those elements of the vector, which have a different
// pointer value than observable. // pointer value than observable.
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same // The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
// anddd there is no way to do that without using format // anddd there is no way to do that without using format
let to_remove = format!("{:?}", observable);
self.observers self.observers
.retain(|obs| format!("{:?}", obs) != format!("{:?}", &observable)); .retain(|obs| format!("{:?}", obs) != to_remove);
self.is_observed = !self.observers.is_empty();
} }
/** /// Notifies the observers of the GatewayEvent.
Updates the GatewayEvent's data and notifies the observers. async fn notify(&self, new_event_data: T) {
*/
async fn update_data(&mut self, new_event_data: T) {
self.event_data = new_event_data;
self.notify().await;
}
/**
Notifies the observers of the GatewayEvent.
*/
async fn notify(&self) {
for observer in &self.observers { for observer in &self.observers {
let mut observer_lock = observer.lock().await; observer.update(&new_event_data);
observer_lock.update(&self.event_data);
drop(observer_lock);
} }
} }
} }
@ -1932,23 +1877,23 @@ mod events {
#[cfg(test)] #[cfg(test)]
mod example { mod example {
use super::*; use super::*;
use std::sync::atomic::{AtomicI32, Ordering::Relaxed};
#[derive(Debug)] #[derive(Debug)]
struct Consumer; struct Consumer {
name: String,
events_received: AtomicI32,
}
impl Observer<types::GatewayResume> for Consumer { impl Observer<types::GatewayResume> for Consumer {
fn update(&mut self, data: &types::GatewayResume) { fn update(&self, _data: &types::GatewayResume) {
println!("{}", data.token) self.events_received.fetch_add(1, Relaxed);
} }
} }
#[tokio::test] #[tokio::test]
async fn test_observer_behavior() { async fn test_observer_behavior() {
let mut event = GatewayEvent::new(types::GatewayResume { let mut event = GatewayEvent::default();
token: "start".to_string(),
session_id: "start".to_string(),
seq: "start".to_string(),
});
let new_data = types::GatewayResume { let new_data = types::GatewayResume {
token: "token_3276ha37am3".to_string(), token: "token_3276ha37am3".to_string(),
@ -1956,25 +1901,23 @@ mod example {
seq: "3".to_string(), seq: "3".to_string(),
}; };
let consumer = Consumer; let consumer = Arc::new(Consumer {
let arc_mut_consumer = Arc::new(Mutex::new(consumer)); name: "first".into(),
events_received: 0.into(),
});
event.subscribe(consumer.clone());
event.subscribe(arc_mut_consumer.clone()).unwrap(); let second_consumer = Arc::new(Consumer {
name: "second".into(),
events_received: 0.into(),
});
event.subscribe(second_consumer.clone());
event.notify().await; event.notify(new_data.clone()).await;
event.unsubscribe(&*consumer);
event.notify(new_data).await;
event.update_data(new_data).await; assert_eq!(consumer.events_received.load(Relaxed), 1);
assert_eq!(second_consumer.events_received.load(Relaxed), 2);
let second_consumer = Consumer;
let arc_mut_second_consumer = Arc::new(Mutex::new(second_consumer));
match event.subscribe(arc_mut_second_consumer.clone()).err() {
None => panic!(),
Some(err) => println!("You cannot subscribe twice: {}", err),
}
event.unsubscribe(arc_mut_consumer.clone());
event.subscribe(arc_mut_second_consumer).unwrap();
} }
} }

View File

@ -1,7 +1,7 @@
use crate::types::events::WebSocketEvent; use crate::types::events::WebSocketEvent;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)] #[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct GatewayResume { pub struct GatewayResume {
pub token: String, pub token: String,
pub session_id: String, pub session_id: String,

View File

@ -64,3 +64,20 @@ async fn send_message_attachment() {
.unwrap(); .unwrap();
common::teardown(bundle).await common::teardown(bundle).await
} }
#[tokio::test]
async fn read_messages() {
let mut bundle = common::setup().await;
// First create some messages to read
let mut message = types::MessageSendSchema {
content: Some("A Message!".to_string()),
..Default::default()
};
let _ = bundle
.user
.send_message(&mut message, bundle.channel.id.to_string(), None)
.await
.unwrap();
common::teardown(bundle).await
}