Merge pull request #117 from SpecificProtagonist/gateway-event
Simplify GatewayEvent
This commit is contained in:
commit
dcacfacda9
|
@ -4,7 +4,7 @@ use chorus::{
|
||||||
types::{GatewayIdentifyPayload, GatewayReady},
|
types::{GatewayIdentifyPayload, GatewayReady},
|
||||||
};
|
};
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use tokio::{self, sync::Mutex, time::sleep};
|
use tokio::{self, time::sleep};
|
||||||
|
|
||||||
// This example creates a simple gateway connection and a basic observer struct
|
// This example creates a simple gateway connection and a basic observer struct
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ pub struct ExampleObserver {}
|
||||||
// One struct can be an observer of multiple websocketevents, if needed
|
// One struct can be an observer of multiple websocketevents, if needed
|
||||||
impl Observer<GatewayReady> for ExampleObserver {
|
impl Observer<GatewayReady> for ExampleObserver {
|
||||||
// After we subscribe to an event this function is called every time we receive it
|
// After we subscribe to an event this function is called every time we receive it
|
||||||
fn update(&mut self, _data: &GatewayReady) {
|
fn update(&self, _data: &GatewayReady) {
|
||||||
println!("Observed Ready!");
|
println!("Observed Ready!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,8 +33,8 @@ async fn main() {
|
||||||
// Create an instance of our observer
|
// Create an instance of our observer
|
||||||
let observer = ExampleObserver {};
|
let observer = ExampleObserver {};
|
||||||
|
|
||||||
// Because observers have to reside in between the main and gateway thread, (they have to be shared between both) we need to put them in an Arc<Mutex>
|
// Share ownership of the observer with the gateway
|
||||||
let shared_observer = Arc::new(Mutex::new(observer));
|
let shared_observer = Arc::new(observer);
|
||||||
|
|
||||||
// Subscribe our observer to the Ready event on this gateway
|
// Subscribe our observer to the Ready event on this gateway
|
||||||
// From now on observer.update(data) will be called every time we receive the Ready event
|
// From now on observer.update(data) will be called every time we receive the Ready event
|
||||||
|
@ -44,8 +44,7 @@ async fn main() {
|
||||||
.await
|
.await
|
||||||
.session
|
.session
|
||||||
.ready
|
.ready
|
||||||
.subscribe(shared_observer)
|
.subscribe(shared_observer);
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Authenticate so we will receive any events
|
// Authenticate so we will receive any events
|
||||||
let token = "SecretToken".to_string();
|
let token = "SecretToken".to_string();
|
||||||
|
|
171
src/gateway.rs
171
src/gateway.rs
|
@ -3,6 +3,7 @@ use crate::errors::ObserverError;
|
||||||
use crate::gateway::events::Events;
|
use crate::gateway::events::Events;
|
||||||
use crate::types;
|
use crate::types;
|
||||||
use crate::types::WebSocketEvent;
|
use crate::types::WebSocketEvent;
|
||||||
|
use std::any::Any;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures_util::stream::SplitSink;
|
use futures_util::stream::SplitSink;
|
||||||
|
@ -72,11 +73,9 @@ const GATEWAY_LAZY_REQUEST: u8 = 14;
|
||||||
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
|
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
|
||||||
const HEARTBEAT_ACK_TIMEOUT: u128 = 2000;
|
const HEARTBEAT_ACK_TIMEOUT: u128 = 2000;
|
||||||
|
|
||||||
|
/// Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
|
||||||
|
/// This struct is used internally when handling messages.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
/**
|
|
||||||
Represents a messsage received from the gateway. This will be either a [GatewayReceivePayload], containing events, or a [GatewayError].
|
|
||||||
This struct is used internally when handling messages.
|
|
||||||
*/
|
|
||||||
pub struct GatewayMessage {
|
pub struct GatewayMessage {
|
||||||
/// The message we received from the server
|
/// The message we received from the server
|
||||||
message: tokio_tungstenite::tungstenite::Message,
|
message: tokio_tungstenite::tungstenite::Message,
|
||||||
|
@ -148,13 +147,11 @@ impl GatewayMessage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Represents a handle to a Gateway connection. A Gateway connection will create observable
|
||||||
|
/// [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
|
||||||
|
/// implemented [Types] with the trait [`WebSocketEvent`]
|
||||||
|
/// Using this handle you can also send Gateway Events directly.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/**
|
|
||||||
Represents a handle to a Gateway connection. A Gateway connection will create observable
|
|
||||||
[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
|
|
||||||
implemented [Types] with the trait [`WebSocketEvent`]
|
|
||||||
Using this handle you can also send Gateway Events directly.
|
|
||||||
*/
|
|
||||||
pub struct GatewayHandle {
|
pub struct GatewayHandle {
|
||||||
pub url: String,
|
pub url: String,
|
||||||
pub events: Arc<Mutex<Events>>,
|
pub events: Arc<Mutex<Events>>,
|
||||||
|
@ -395,7 +392,7 @@ impl Gateway {
|
||||||
return Err(data_deserialize_result.err().unwrap());
|
return Err(data_deserialize_result.err().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
event.update_data(data_deserialize_result.unwrap()).await;
|
event.notify(data_deserialize_result.unwrap()).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1370,13 +1367,7 @@ impl Gateway {
|
||||||
sessions: result.unwrap(),
|
sessions: result.unwrap(),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.events
|
self.events.lock().await.session.replace.notify(data).await;
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.session
|
|
||||||
.replace
|
|
||||||
.update_data(data)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
"USER_UPDATE" => {
|
"USER_UPDATE" => {
|
||||||
let event = &mut self.events.lock().await.user.update;
|
let event = &mut self.events.lock().await.user.update;
|
||||||
|
@ -1532,9 +1523,7 @@ impl Gateway {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Handles sending heartbeats to the gateway in another thread
|
||||||
Handles sending heartbeats to the gateway in another thread
|
|
||||||
*/
|
|
||||||
struct HeartbeatHandler {
|
struct HeartbeatHandler {
|
||||||
/// The heartbeat interval in milliseconds
|
/// The heartbeat interval in milliseconds
|
||||||
pub heartbeat_interval: u128,
|
pub heartbeat_interval: u128,
|
||||||
|
@ -1669,10 +1658,8 @@ impl HeartbeatHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Used for communications between the heartbeat and gateway thread.
|
||||||
Used for communications between the heartbeat and gateway thread.
|
/// Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
|
||||||
Either signifies a sequence number update, a heartbeat ACK or a Heartbeat request by the server
|
|
||||||
*/
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
struct HeartbeatThreadCommunication {
|
struct HeartbeatThreadCommunication {
|
||||||
/// The opcode for the communication we received, if relevant
|
/// The opcode for the communication we received, if relevant
|
||||||
|
@ -1681,89 +1668,47 @@ struct HeartbeatThreadCommunication {
|
||||||
sequence_number: Option<u64>,
|
sequence_number: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
|
||||||
Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
|
/// an Observable. The Observer is notified when the Observable's data changes.
|
||||||
an Observable. The Observer is notified when the Observable's data changes.
|
/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
|
||||||
In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
|
/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
|
||||||
*/
|
pub trait Observer<T>: Sync + Send + std::fmt::Debug {
|
||||||
pub trait Observer<T: types::WebSocketEvent>: std::fmt::Debug {
|
fn update(&self, data: &T);
|
||||||
fn update(&mut self, data: &T);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
|
/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
|
||||||
change in the WebSocketEvent. GatewayEvents are observable.
|
/// change in the WebSocketEvent. GatewayEvents are observable.
|
||||||
*/
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct GatewayEvent<T: types::WebSocketEvent> {
|
pub struct GatewayEvent<T: WebSocketEvent> {
|
||||||
observers: Vec<Arc<Mutex<dyn Observer<T> + Sync + Send>>>,
|
observers: Vec<Arc<dyn Observer<T>>>,
|
||||||
pub event_data: T,
|
|
||||||
pub is_observed: bool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: types::WebSocketEvent> GatewayEvent<T> {
|
impl<T: WebSocketEvent> GatewayEvent<T> {
|
||||||
fn new(event_data: T) -> Self {
|
/// Returns true if the GatewayEvent is observed by at least one Observer.
|
||||||
Self {
|
|
||||||
is_observed: false,
|
|
||||||
observers: Vec::new(),
|
|
||||||
event_data,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
Returns true if the GatewayEvent is observed by at least one Observer.
|
|
||||||
*/
|
|
||||||
pub fn is_observed(&self) -> bool {
|
pub fn is_observed(&self) -> bool {
|
||||||
self.is_observed
|
!self.observers.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Subscribes an Observer to the GatewayEvent.
|
||||||
Subscribes an Observer to the GatewayEvent. Returns an error if the GatewayEvent is already
|
pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
|
||||||
observed.
|
|
||||||
# Errors
|
|
||||||
Returns an error if the GatewayEvent is already observed.
|
|
||||||
Error type: [`ObserverError::AlreadySubscribedError`]
|
|
||||||
*/
|
|
||||||
pub fn subscribe(
|
|
||||||
&mut self,
|
|
||||||
observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>,
|
|
||||||
) -> Result<(), ObserverError> {
|
|
||||||
if self.is_observed {
|
|
||||||
return Err(ObserverError::AlreadySubscribedError);
|
|
||||||
}
|
|
||||||
self.is_observed = true;
|
|
||||||
self.observers.push(observable);
|
self.observers.push(observable);
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Unsubscribes an Observer from the GatewayEvent.
|
||||||
Unsubscribes an Observer from the GatewayEvent.
|
pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
|
||||||
*/
|
|
||||||
pub fn unsubscribe(&mut self, observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>) {
|
|
||||||
// .retain()'s closure retains only those elements of the vector, which have a different
|
// .retain()'s closure retains only those elements of the vector, which have a different
|
||||||
// pointer value than observable.
|
// pointer value than observable.
|
||||||
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
|
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
|
||||||
// anddd there is no way to do that without using format
|
// anddd there is no way to do that without using format
|
||||||
|
let to_remove = format!("{:?}", observable);
|
||||||
self.observers
|
self.observers
|
||||||
.retain(|obs| format!("{:?}", obs) != format!("{:?}", &observable));
|
.retain(|obs| format!("{:?}", obs) != to_remove);
|
||||||
self.is_observed = !self.observers.is_empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Notifies the observers of the GatewayEvent.
|
||||||
Updates the GatewayEvent's data and notifies the observers.
|
async fn notify(&self, new_event_data: T) {
|
||||||
*/
|
|
||||||
async fn update_data(&mut self, new_event_data: T) {
|
|
||||||
self.event_data = new_event_data;
|
|
||||||
self.notify().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
Notifies the observers of the GatewayEvent.
|
|
||||||
*/
|
|
||||||
async fn notify(&self) {
|
|
||||||
for observer in &self.observers {
|
for observer in &self.observers {
|
||||||
let mut observer_lock = observer.lock().await;
|
observer.update(&new_event_data);
|
||||||
observer_lock.update(&self.event_data);
|
|
||||||
drop(observer_lock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1932,23 +1877,23 @@ mod events {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod example {
|
mod example {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use std::sync::atomic::{AtomicI32, Ordering::Relaxed};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Consumer;
|
struct Consumer {
|
||||||
|
name: String,
|
||||||
|
events_received: AtomicI32,
|
||||||
|
}
|
||||||
|
|
||||||
impl Observer<types::GatewayResume> for Consumer {
|
impl Observer<types::GatewayResume> for Consumer {
|
||||||
fn update(&mut self, data: &types::GatewayResume) {
|
fn update(&self, _data: &types::GatewayResume) {
|
||||||
println!("{}", data.token)
|
self.events_received.fetch_add(1, Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_observer_behavior() {
|
async fn test_observer_behavior() {
|
||||||
let mut event = GatewayEvent::new(types::GatewayResume {
|
let mut event = GatewayEvent::default();
|
||||||
token: "start".to_string(),
|
|
||||||
session_id: "start".to_string(),
|
|
||||||
seq: "start".to_string(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let new_data = types::GatewayResume {
|
let new_data = types::GatewayResume {
|
||||||
token: "token_3276ha37am3".to_string(),
|
token: "token_3276ha37am3".to_string(),
|
||||||
|
@ -1956,25 +1901,23 @@ mod example {
|
||||||
seq: "3".to_string(),
|
seq: "3".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let consumer = Consumer;
|
let consumer = Arc::new(Consumer {
|
||||||
let arc_mut_consumer = Arc::new(Mutex::new(consumer));
|
name: "first".into(),
|
||||||
|
events_received: 0.into(),
|
||||||
|
});
|
||||||
|
event.subscribe(consumer.clone());
|
||||||
|
|
||||||
event.subscribe(arc_mut_consumer.clone()).unwrap();
|
let second_consumer = Arc::new(Consumer {
|
||||||
|
name: "second".into(),
|
||||||
|
events_received: 0.into(),
|
||||||
|
});
|
||||||
|
event.subscribe(second_consumer.clone());
|
||||||
|
|
||||||
event.notify().await;
|
event.notify(new_data.clone()).await;
|
||||||
|
event.unsubscribe(&*consumer);
|
||||||
|
event.notify(new_data).await;
|
||||||
|
|
||||||
event.update_data(new_data).await;
|
assert_eq!(consumer.events_received.load(Relaxed), 1);
|
||||||
|
assert_eq!(second_consumer.events_received.load(Relaxed), 2);
|
||||||
let second_consumer = Consumer;
|
|
||||||
let arc_mut_second_consumer = Arc::new(Mutex::new(second_consumer));
|
|
||||||
|
|
||||||
match event.subscribe(arc_mut_second_consumer.clone()).err() {
|
|
||||||
None => panic!(),
|
|
||||||
Some(err) => println!("You cannot subscribe twice: {}", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
event.unsubscribe(arc_mut_consumer.clone());
|
|
||||||
|
|
||||||
event.subscribe(arc_mut_second_consumer).unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use crate::types::events::WebSocketEvent;
|
use crate::types::events::WebSocketEvent;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize, Default)]
|
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
|
||||||
pub struct GatewayResume {
|
pub struct GatewayResume {
|
||||||
pub token: String,
|
pub token: String,
|
||||||
pub session_id: String,
|
pub session_id: String,
|
||||||
|
|
|
@ -64,3 +64,20 @@ async fn send_message_attachment() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
common::teardown(bundle).await
|
common::teardown(bundle).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn read_messages() {
|
||||||
|
let mut bundle = common::setup().await;
|
||||||
|
|
||||||
|
// First create some messages to read
|
||||||
|
let mut message = types::MessageSendSchema {
|
||||||
|
content: Some("A Message!".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let _ = bundle
|
||||||
|
.user
|
||||||
|
.send_message(&mut message, bundle.channel.id.to_string(), None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
common::teardown(bundle).await
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue