Merge pull request #117 from SpecificProtagonist/gateway-event

Simplify GatewayEvent
This commit is contained in:
Flori 2023-06-20 18:57:13 +02:00 committed by GitHub
commit 225b6e4798
4 changed files with 80 additions and 121 deletions

View File

@ -4,7 +4,7 @@ use chorus::{
types::{GatewayIdentifyPayload, GatewayReady},
};
use std::{sync::Arc, time::Duration};
use tokio::{self, sync::Mutex, time::sleep};
use tokio::{self, time::sleep};
// This example creates a simple gateway connection and a basic observer struct
@ -17,7 +17,7 @@ pub struct ExampleObserver {}
// One struct can be an observer of multiple websocketevents, if needed
impl Observer<GatewayReady> for ExampleObserver {
// After we subscribe to an event this function is called every time we receive it
fn update(&mut self, _data: &GatewayReady) {
fn update(&self, _data: &GatewayReady) {
println!("Observed Ready!");
}
}
@ -33,8 +33,8 @@ async fn main() {
// Create an instance of our observer
let observer = ExampleObserver {};
// Because observers have to reside in between the main and gateway thread, (they have to be shared between both) we need to put them in an Arc<Mutex>
let shared_observer = Arc::new(Mutex::new(observer));
// Share ownership of the observer with the gateway
let shared_observer = Arc::new(observer);
// Subscribe our observer to the Ready event on this gateway
// From now on observer.update(data) will be called every time we receive the Ready event
@ -44,8 +44,7 @@ async fn main() {
.await
.session
.ready
.subscribe(shared_observer)
.unwrap();
.subscribe(shared_observer);
// Authenticate so we will receive any events
let token = "SecretToken".to_string();

View File

@ -3,6 +3,7 @@ use crate::errors::ObserverError;
use crate::gateway::events::Events;
use crate::types;
use crate::types::WebSocketEvent;
use std::any::Any;
use std::sync::Arc;
use futures_util::stream::SplitSink;
@ -72,11 +73,9 @@ const GATEWAY_LAZY_REQUEST: u8 = 14;
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
const HEARTBEAT_ACK_TIMEOUT: u128 = 2000;
/// Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
/// This struct is used internally when handling messages.
#[derive(Clone, Debug)]
/**
Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
This struct is used internally when handling messages.
*/
pub struct GatewayMessage {
/// The message we received from the server
message: tokio_tungstenite::tungstenite::Message,
@ -148,13 +147,11 @@ impl GatewayMessage {
}
}
/// Represents a handle to a Gateway connection. A Gateway connection will create observable
/// [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
/// implemented [Types] with the trait [`WebSocketEvent`]
/// Using this handle you can also send Gateway Events directly.
#[derive(Debug)]
/**
Represents a handle to a Gateway connection. A Gateway connection will create observable
[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
implemented [Types] with the trait [`WebSocketEvent`]
Using this handle you can also send Gateway Events directly.
*/
pub struct GatewayHandle {
pub url: String,
pub events: Arc<Mutex<Events>>,
@ -395,7 +392,7 @@ impl Gateway {
return Err(data_deserialize_result.err().unwrap());
}
event.update_data(data_deserialize_result.unwrap()).await;
event.notify(data_deserialize_result.unwrap()).await;
Ok(())
}
@ -1370,13 +1367,7 @@ impl Gateway {
sessions: result.unwrap(),
};
self.events
.lock()
.await
.session
.replace
.update_data(data)
.await;
self.events.lock().await.session.replace.notify(data).await;
}
"USER_UPDATE" => {
let event = &mut self.events.lock().await.user.update;
@ -1532,9 +1523,7 @@ impl Gateway {
}
}
/**
Handles sending heartbeats to the gateway in another thread
*/
/// Handles sending heartbeats to the gateway in another thread
struct HeartbeatHandler {
/// The heartbeat interval in milliseconds
pub heartbeat_interval: u128,
@ -1669,10 +1658,8 @@ impl HeartbeatHandler {
}
}
/**
Used for communications between the heartbeat and gateway thread.
Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
*/
/// Used for communications between the heartbeat and gateway thread.
/// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
#[derive(Clone, Copy, Debug)]
struct HeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant
@ -1681,89 +1668,47 @@ struct HeartbeatThreadCommunication {
sequence_number: Option<u64>,
}
/**
Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
an Observable. The Observer is notified when the Observable's data changes.
In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
*/
pub trait Observer<T: types::WebSocketEvent>: std::fmt::Debug {
fn update(&mut self, data: &T);
/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
/// an Observable. The Observer is notified when the Observable's data changes.
/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
pub trait Observer<T>: Sync + Send + std::fmt::Debug {
fn update(&self, data: &T);
}
/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
change in the WebSocketEvent. GatewayEvents are observable.
*/
/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
/// change in the WebSocketEvent. GatewayEvents are observable.
#[derive(Default, Debug)]
pub struct GatewayEvent<T: types::WebSocketEvent> {
observers: Vec<Arc<Mutex<dyn Observer<T> + Sync + Send>>>,
pub event_data: T,
pub is_observed: bool,
pub struct GatewayEvent<T: WebSocketEvent> {
observers: Vec<Arc<dyn Observer<T>>>,
}
impl<T: types::WebSocketEvent> GatewayEvent<T> {
fn new(event_data: T) -> Self {
Self {
is_observed: false,
observers: Vec::new(),
event_data,
}
}
/**
Returns true if the GatewayEvent is observed by at least one Observer.
*/
impl<T: WebSocketEvent> GatewayEvent<T> {
/// Returns true if the GatewayEvent is observed by at least one Observer.
pub fn is_observed(&self) -> bool {
self.is_observed
!self.observers.is_empty()
}
/**
Subscribes an Observer to the GatewayEvent. Returns an error if the GatewayEvent is already
observed.
# Errors
Returns an error if the GatewayEvent is already observed.
Error type: [`ObserverError::AlreadySubscribedError`]
*/
pub fn subscribe(
&mut self,
observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>,
) -> Result<(), ObserverError> {
if self.is_observed {
return Err(ObserverError::AlreadySubscribedError);
}
self.is_observed = true;
/// Subscribes an Observer to the GatewayEvent.
pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
self.observers.push(observable);
Ok(())
}
/**
Unsubscribes an Observer from the GatewayEvent.
*/
pub fn unsubscribe(&mut self, observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>) {
/// Unsubscribes an Observer from the GatewayEvent.
pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
// .retain()'s closure retains only those elements of the vector, which have a different
// pointer value than observable.
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
// anddd there is no way to do that without using format
let to_remove = format!("{:?}", observable);
self.observers
.retain(|obs| format!("{:?}", obs) != format!("{:?}", &observable));
self.is_observed = !self.observers.is_empty();
.retain(|obs| format!("{:?}", obs) != to_remove);
}
/**
Updates the GatewayEvent's data and notifies the observers.
*/
async fn update_data(&mut self, new_event_data: T) {
self.event_data = new_event_data;
self.notify().await;
}
/**
Notifies the observers of the GatewayEvent.
*/
async fn notify(&self) {
/// Notifies the observers of the GatewayEvent.
async fn notify(&self, new_event_data: T) {
for observer in &self.observers {
let mut observer_lock = observer.lock().await;
observer_lock.update(&self.event_data);
drop(observer_lock);
observer.update(&new_event_data);
}
}
}
@ -1932,23 +1877,23 @@ mod events {
#[cfg(test)]
mod example {
use super::*;
use std::sync::atomic::{AtomicI32, Ordering::Relaxed};
#[derive(Debug)]
struct Consumer;
struct Consumer {
name: String,
events_received: AtomicI32,
}
impl Observer<types::GatewayResume> for Consumer {
fn update(&mut self, data: &types::GatewayResume) {
println!("{}", data.token)
fn update(&self, _data: &types::GatewayResume) {
self.events_received.fetch_add(1, Relaxed);
}
}
#[tokio::test]
async fn test_observer_behavior() {
let mut event = GatewayEvent::new(types::GatewayResume {
token: "start".to_string(),
session_id: "start".to_string(),
seq: "start".to_string(),
});
let mut event = GatewayEvent::default();
let new_data = types::GatewayResume {
token: "token_3276ha37am3".to_string(),
@ -1956,25 +1901,23 @@ mod example {
seq: "3".to_string(),
};
let consumer = Consumer;
let arc_mut_consumer = Arc::new(Mutex::new(consumer));
let consumer = Arc::new(Consumer {
name: "first".into(),
events_received: 0.into(),
});
event.subscribe(consumer.clone());
event.subscribe(arc_mut_consumer.clone()).unwrap();
let second_consumer = Arc::new(Consumer {
name: "second".into(),
events_received: 0.into(),
});
event.subscribe(second_consumer.clone());
event.notify().await;
event.notify(new_data.clone()).await;
event.unsubscribe(&*consumer);
event.notify(new_data).await;
event.update_data(new_data).await;
let second_consumer = Consumer;
let arc_mut_second_consumer = Arc::new(Mutex::new(second_consumer));
match event.subscribe(arc_mut_second_consumer.clone()).err() {
None => panic!(),
Some(err) => println!("You cannot subscribe twice: {}", err),
}
event.unsubscribe(arc_mut_consumer.clone());
event.subscribe(arc_mut_second_consumer).unwrap();
assert_eq!(consumer.events_received.load(Relaxed), 1);
assert_eq!(second_consumer.events_received.load(Relaxed), 2);
}
}

View File

@ -1,7 +1,7 @@
use crate::types::events::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct GatewayResume {
pub token: String,
pub session_id: String,

View File

@ -64,3 +64,20 @@ async fn send_message_attachment() {
.unwrap();
common::teardown(bundle).await
}
#[tokio::test]
async fn read_messages() {
let mut bundle = common::setup().await;
// First create some messages to read
let mut message = types::MessageSendSchema {
content: Some("A Message!".to_string()),
..Default::default()
};
let _ = bundle
.user
.send_message(&mut message, bundle.channel.id.to_string(), None)
.await
.unwrap();
common::teardown(bundle).await
}