Compare commits
3 Commits
484c69229d
...
b0667a33fb
Author | SHA1 | Date |
---|---|---|
Flori | b0667a33fb | |
bitfl0wer | ebcb6b65e4 | |
bitfl0wer | 7554f90187 |
|
@ -233,6 +233,7 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"poem",
|
"poem",
|
||||||
|
"pubserve",
|
||||||
"rand",
|
"rand",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
@ -1594,6 +1595,15 @@ dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pubserve"
|
||||||
|
version = "1.1.0-alpha.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b1781b2a51798c98a381e839e61bc5ce6426bd89bb9c3f9142de2086a80591cd"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.36"
|
version = "1.0.36"
|
||||||
|
|
|
@ -66,6 +66,7 @@ crypto_secretbox = { version = "0.1.1", optional = true }
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
flate2 = { version = "1.0.30", optional = true }
|
flate2 = { version = "1.0.30", optional = true }
|
||||||
webpki-roots = "0.26.3"
|
webpki-roots = "0.26.3"
|
||||||
|
pubserve = { version = "1.1.0-alpha.1", features = ["async", "send"] }
|
||||||
|
|
||||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||||
rustls = "0.21.10"
|
rustls = "0.21.10"
|
||||||
|
|
|
@ -17,9 +17,9 @@ use async_trait::async_trait;
|
||||||
use chorus::gateway::{Gateway, GatewayOptions};
|
use chorus::gateway::{Gateway, GatewayOptions};
|
||||||
use chorus::{
|
use chorus::{
|
||||||
self,
|
self,
|
||||||
gateway::Observer,
|
|
||||||
types::{GatewayIdentifyPayload, GatewayReady},
|
types::{GatewayIdentifyPayload, GatewayReady},
|
||||||
};
|
};
|
||||||
|
use pubserve::Subscriber;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use tokio::{self};
|
use tokio::{self};
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ pub struct ExampleObserver {}
|
||||||
// The Observer trait can be implemented for a struct for a given websocketevent to handle observing it
|
// The Observer trait can be implemented for a struct for a given websocketevent to handle observing it
|
||||||
// One struct can be an observer of multiple websocketevents, if needed
|
// One struct can be an observer of multiple websocketevents, if needed
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Observer<GatewayReady> for ExampleObserver {
|
impl Subscriber<GatewayReady> for ExampleObserver {
|
||||||
// After we subscribe to an event this function is called every time we receive it
|
// After we subscribe to an event this function is called every time we receive it
|
||||||
async fn update(&self, _data: &GatewayReady) {
|
async fn update(&self, _data: &GatewayReady) {
|
||||||
println!("Observed Ready!");
|
println!("Observed Ready!");
|
||||||
|
@ -56,7 +56,9 @@ async fn main() {
|
||||||
let options = GatewayOptions::default();
|
let options = GatewayOptions::default();
|
||||||
|
|
||||||
// Initiate the gateway connection
|
// Initiate the gateway connection
|
||||||
let gateway = Gateway::spawn(gateway_websocket_url, options).await.unwrap();
|
let gateway = Gateway::spawn(gateway_websocket_url, options)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Create an instance of our observer
|
// Create an instance of our observer
|
||||||
let observer = ExampleObserver {};
|
let observer = ExampleObserver {};
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
use pubserve::Publisher;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::types;
|
use crate::types;
|
||||||
|
|
||||||
|
@ -23,144 +25,144 @@ pub struct Events {
|
||||||
pub call: Call,
|
pub call: Call,
|
||||||
pub voice: Voice,
|
pub voice: Voice,
|
||||||
pub webhooks: Webhooks,
|
pub webhooks: Webhooks,
|
||||||
pub gateway_identify_payload: GatewayEvent<types::GatewayIdentifyPayload>,
|
pub gateway_identify_payload: Publisher<types::GatewayIdentifyPayload>,
|
||||||
pub gateway_resume: GatewayEvent<types::GatewayResume>,
|
pub gateway_resume: Publisher<types::GatewayResume>,
|
||||||
pub error: GatewayEvent<GatewayError>,
|
pub error: Publisher<GatewayError>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Application {
|
pub struct Application {
|
||||||
pub command_permissions_update: GatewayEvent<types::ApplicationCommandPermissionsUpdate>,
|
pub command_permissions_update: Publisher<types::ApplicationCommandPermissionsUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct AutoModeration {
|
pub struct AutoModeration {
|
||||||
pub rule_create: GatewayEvent<types::AutoModerationRuleCreate>,
|
pub rule_create: Publisher<types::AutoModerationRuleCreate>,
|
||||||
pub rule_update: GatewayEvent<types::AutoModerationRuleUpdate>,
|
pub rule_update: Publisher<types::AutoModerationRuleUpdate>,
|
||||||
pub rule_delete: GatewayEvent<types::AutoModerationRuleDelete>,
|
pub rule_delete: Publisher<types::AutoModerationRuleDelete>,
|
||||||
pub action_execution: GatewayEvent<types::AutoModerationActionExecution>,
|
pub action_execution: Publisher<types::AutoModerationActionExecution>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
pub ready: GatewayEvent<types::GatewayReady>,
|
pub ready: Publisher<types::GatewayReady>,
|
||||||
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
|
pub ready_supplemental: Publisher<types::GatewayReadySupplemental>,
|
||||||
pub replace: GatewayEvent<types::SessionsReplace>,
|
pub replace: Publisher<types::SessionsReplace>,
|
||||||
pub reconnect: GatewayEvent<types::GatewayReconnect>,
|
pub reconnect: Publisher<types::GatewayReconnect>,
|
||||||
pub invalid: GatewayEvent<types::GatewayInvalidSession>,
|
pub invalid: Publisher<types::GatewayInvalidSession>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct StageInstance {
|
pub struct StageInstance {
|
||||||
pub create: GatewayEvent<types::StageInstanceCreate>,
|
pub create: Publisher<types::StageInstanceCreate>,
|
||||||
pub update: GatewayEvent<types::StageInstanceUpdate>,
|
pub update: Publisher<types::StageInstanceUpdate>,
|
||||||
pub delete: GatewayEvent<types::StageInstanceDelete>,
|
pub delete: Publisher<types::StageInstanceDelete>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Message {
|
pub struct Message {
|
||||||
pub create: GatewayEvent<types::MessageCreate>,
|
pub create: Publisher<types::MessageCreate>,
|
||||||
pub update: GatewayEvent<types::MessageUpdate>,
|
pub update: Publisher<types::MessageUpdate>,
|
||||||
pub delete: GatewayEvent<types::MessageDelete>,
|
pub delete: Publisher<types::MessageDelete>,
|
||||||
pub delete_bulk: GatewayEvent<types::MessageDeleteBulk>,
|
pub delete_bulk: Publisher<types::MessageDeleteBulk>,
|
||||||
pub reaction_add: GatewayEvent<types::MessageReactionAdd>,
|
pub reaction_add: Publisher<types::MessageReactionAdd>,
|
||||||
pub reaction_remove: GatewayEvent<types::MessageReactionRemove>,
|
pub reaction_remove: Publisher<types::MessageReactionRemove>,
|
||||||
pub reaction_remove_all: GatewayEvent<types::MessageReactionRemoveAll>,
|
pub reaction_remove_all: Publisher<types::MessageReactionRemoveAll>,
|
||||||
pub reaction_remove_emoji: GatewayEvent<types::MessageReactionRemoveEmoji>,
|
pub reaction_remove_emoji: Publisher<types::MessageReactionRemoveEmoji>,
|
||||||
pub ack: GatewayEvent<types::MessageACK>,
|
pub ack: Publisher<types::MessageACK>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct User {
|
pub struct User {
|
||||||
pub update: GatewayEvent<types::UserUpdate>,
|
pub update: Publisher<types::UserUpdate>,
|
||||||
pub guild_settings_update: GatewayEvent<types::UserGuildSettingsUpdate>,
|
pub guild_settings_update: Publisher<types::UserGuildSettingsUpdate>,
|
||||||
pub presence_update: GatewayEvent<types::PresenceUpdate>,
|
pub presence_update: Publisher<types::PresenceUpdate>,
|
||||||
pub typing_start: GatewayEvent<types::TypingStartEvent>,
|
pub typing_start: Publisher<types::TypingStartEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Relationship {
|
pub struct Relationship {
|
||||||
pub add: GatewayEvent<types::RelationshipAdd>,
|
pub add: Publisher<types::RelationshipAdd>,
|
||||||
pub remove: GatewayEvent<types::RelationshipRemove>,
|
pub remove: Publisher<types::RelationshipRemove>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Channel {
|
pub struct Channel {
|
||||||
pub create: GatewayEvent<types::ChannelCreate>,
|
pub create: Publisher<types::ChannelCreate>,
|
||||||
pub update: GatewayEvent<types::ChannelUpdate>,
|
pub update: Publisher<types::ChannelUpdate>,
|
||||||
pub unread_update: GatewayEvent<types::ChannelUnreadUpdate>,
|
pub unread_update: Publisher<types::ChannelUnreadUpdate>,
|
||||||
pub delete: GatewayEvent<types::ChannelDelete>,
|
pub delete: Publisher<types::ChannelDelete>,
|
||||||
pub pins_update: GatewayEvent<types::ChannelPinsUpdate>,
|
pub pins_update: Publisher<types::ChannelPinsUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Thread {
|
pub struct Thread {
|
||||||
pub create: GatewayEvent<types::ThreadCreate>,
|
pub create: Publisher<types::ThreadCreate>,
|
||||||
pub update: GatewayEvent<types::ThreadUpdate>,
|
pub update: Publisher<types::ThreadUpdate>,
|
||||||
pub delete: GatewayEvent<types::ThreadDelete>,
|
pub delete: Publisher<types::ThreadDelete>,
|
||||||
pub list_sync: GatewayEvent<types::ThreadListSync>,
|
pub list_sync: Publisher<types::ThreadListSync>,
|
||||||
pub member_update: GatewayEvent<types::ThreadMemberUpdate>,
|
pub member_update: Publisher<types::ThreadMemberUpdate>,
|
||||||
pub members_update: GatewayEvent<types::ThreadMembersUpdate>,
|
pub members_update: Publisher<types::ThreadMembersUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Guild {
|
pub struct Guild {
|
||||||
pub create: GatewayEvent<types::GuildCreate>,
|
pub create: Publisher<types::GuildCreate>,
|
||||||
pub update: GatewayEvent<types::GuildUpdate>,
|
pub update: Publisher<types::GuildUpdate>,
|
||||||
pub delete: GatewayEvent<types::GuildDelete>,
|
pub delete: Publisher<types::GuildDelete>,
|
||||||
pub audit_log_entry_create: GatewayEvent<types::GuildAuditLogEntryCreate>,
|
pub audit_log_entry_create: Publisher<types::GuildAuditLogEntryCreate>,
|
||||||
pub ban_add: GatewayEvent<types::GuildBanAdd>,
|
pub ban_add: Publisher<types::GuildBanAdd>,
|
||||||
pub ban_remove: GatewayEvent<types::GuildBanRemove>,
|
pub ban_remove: Publisher<types::GuildBanRemove>,
|
||||||
pub emojis_update: GatewayEvent<types::GuildEmojisUpdate>,
|
pub emojis_update: Publisher<types::GuildEmojisUpdate>,
|
||||||
pub stickers_update: GatewayEvent<types::GuildStickersUpdate>,
|
pub stickers_update: Publisher<types::GuildStickersUpdate>,
|
||||||
pub integrations_update: GatewayEvent<types::GuildIntegrationsUpdate>,
|
pub integrations_update: Publisher<types::GuildIntegrationsUpdate>,
|
||||||
pub member_add: GatewayEvent<types::GuildMemberAdd>,
|
pub member_add: Publisher<types::GuildMemberAdd>,
|
||||||
pub member_remove: GatewayEvent<types::GuildMemberRemove>,
|
pub member_remove: Publisher<types::GuildMemberRemove>,
|
||||||
pub member_update: GatewayEvent<types::GuildMemberUpdate>,
|
pub member_update: Publisher<types::GuildMemberUpdate>,
|
||||||
pub members_chunk: GatewayEvent<types::GuildMembersChunk>,
|
pub members_chunk: Publisher<types::GuildMembersChunk>,
|
||||||
pub role_create: GatewayEvent<types::GuildRoleCreate>,
|
pub role_create: Publisher<types::GuildRoleCreate>,
|
||||||
pub role_update: GatewayEvent<types::GuildRoleUpdate>,
|
pub role_update: Publisher<types::GuildRoleUpdate>,
|
||||||
pub role_delete: GatewayEvent<types::GuildRoleDelete>,
|
pub role_delete: Publisher<types::GuildRoleDelete>,
|
||||||
pub role_scheduled_event_create: GatewayEvent<types::GuildScheduledEventCreate>,
|
pub role_scheduled_event_create: Publisher<types::GuildScheduledEventCreate>,
|
||||||
pub role_scheduled_event_update: GatewayEvent<types::GuildScheduledEventUpdate>,
|
pub role_scheduled_event_update: Publisher<types::GuildScheduledEventUpdate>,
|
||||||
pub role_scheduled_event_delete: GatewayEvent<types::GuildScheduledEventDelete>,
|
pub role_scheduled_event_delete: Publisher<types::GuildScheduledEventDelete>,
|
||||||
pub role_scheduled_event_user_add: GatewayEvent<types::GuildScheduledEventUserAdd>,
|
pub role_scheduled_event_user_add: Publisher<types::GuildScheduledEventUserAdd>,
|
||||||
pub role_scheduled_event_user_remove: GatewayEvent<types::GuildScheduledEventUserRemove>,
|
pub role_scheduled_event_user_remove: Publisher<types::GuildScheduledEventUserRemove>,
|
||||||
pub passive_update_v1: GatewayEvent<types::PassiveUpdateV1>,
|
pub passive_update_v1: Publisher<types::PassiveUpdateV1>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Invite {
|
pub struct Invite {
|
||||||
pub create: GatewayEvent<types::InviteCreate>,
|
pub create: Publisher<types::InviteCreate>,
|
||||||
pub delete: GatewayEvent<types::InviteDelete>,
|
pub delete: Publisher<types::InviteDelete>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Integration {
|
pub struct Integration {
|
||||||
pub create: GatewayEvent<types::IntegrationCreate>,
|
pub create: Publisher<types::IntegrationCreate>,
|
||||||
pub update: GatewayEvent<types::IntegrationUpdate>,
|
pub update: Publisher<types::IntegrationUpdate>,
|
||||||
pub delete: GatewayEvent<types::IntegrationDelete>,
|
pub delete: Publisher<types::IntegrationDelete>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Interaction {
|
pub struct Interaction {
|
||||||
pub create: GatewayEvent<types::InteractionCreate>,
|
pub create: Publisher<types::InteractionCreate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Call {
|
pub struct Call {
|
||||||
pub create: GatewayEvent<types::CallCreate>,
|
pub create: Publisher<types::CallCreate>,
|
||||||
pub update: GatewayEvent<types::CallUpdate>,
|
pub update: Publisher<types::CallUpdate>,
|
||||||
pub delete: GatewayEvent<types::CallDelete>,
|
pub delete: Publisher<types::CallDelete>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Voice {
|
pub struct Voice {
|
||||||
pub state_update: GatewayEvent<types::VoiceStateUpdate>,
|
pub state_update: Publisher<types::VoiceStateUpdate>,
|
||||||
pub server_update: GatewayEvent<types::VoiceServerUpdate>,
|
pub server_update: Publisher<types::VoiceServerUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Webhooks {
|
pub struct Webhooks {
|
||||||
pub update: GatewayEvent<types::WebhooksUpdate>,
|
pub update: Publisher<types::WebhooksUpdate>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ use std::time::Duration;
|
||||||
use flate2::Decompress;
|
use flate2::Decompress;
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use pubserve::Publisher;
|
||||||
#[cfg(not(target_arch = "wasm32"))]
|
#[cfg(not(target_arch = "wasm32"))]
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
|
@ -197,7 +198,7 @@ impl Gateway {
|
||||||
#[allow(dead_code)] // TODO: Remove this allow annotation
|
#[allow(dead_code)] // TODO: Remove this allow annotation
|
||||||
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
||||||
data: &'a str,
|
data: &'a str,
|
||||||
event: &mut GatewayEvent<T>,
|
event: &mut Publisher<T>,
|
||||||
) -> Result<(), serde_json::Error> {
|
) -> Result<(), serde_json::Error> {
|
||||||
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
||||||
|
|
||||||
|
@ -205,7 +206,7 @@ impl Gateway {
|
||||||
return Err(data_deserialize_result.err().unwrap());
|
return Err(data_deserialize_result.err().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
event.notify(data_deserialize_result.unwrap()).await;
|
event.publish(data_deserialize_result.unwrap()).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +254,7 @@ impl Gateway {
|
||||||
if let Some(error) = msg.error() {
|
if let Some(error) = msg.error() {
|
||||||
warn!("GW: Received error {:?}, connection will close..", error);
|
warn!("GW: Received error {:?}, connection will close..", error);
|
||||||
self.close().await;
|
self.close().await;
|
||||||
self.events.lock().await.error.notify(error).await;
|
self.events.lock().await.error.publish(error).await;
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
||||||
|
@ -292,7 +293,7 @@ impl Gateway {
|
||||||
let id = if message.id().is_some() {
|
let id = if message.id().is_some() {
|
||||||
message.id().unwrap()
|
message.id().unwrap()
|
||||||
} else {
|
} else {
|
||||||
event.notify(message).await;
|
event.publish(message).await;
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
if let Some(to_update) = store.get(&id) {
|
if let Some(to_update) = store.get(&id) {
|
||||||
|
@ -314,7 +315,7 @@ impl Gateway {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)?
|
)?
|
||||||
event.notify(message).await;
|
event.publish(message).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},)*
|
},)*
|
||||||
|
@ -329,7 +330,7 @@ impl Gateway {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Ok(sessions) => {
|
Ok(sessions) => {
|
||||||
self.events.lock().await.session.replace.notify(
|
self.events.lock().await.session.replace.publish(
|
||||||
types::SessionsReplace {sessions}
|
types::SessionsReplace {sessions}
|
||||||
).await;
|
).await;
|
||||||
}
|
}
|
||||||
|
@ -446,7 +447,7 @@ impl Gateway {
|
||||||
.await
|
.await
|
||||||
.session
|
.session
|
||||||
.reconnect
|
.reconnect
|
||||||
.notify(reconnect)
|
.publish(reconnect)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
GATEWAY_INVALID_SESSION => {
|
GATEWAY_INVALID_SESSION => {
|
||||||
|
@ -471,7 +472,7 @@ impl Gateway {
|
||||||
.await
|
.await
|
||||||
.session
|
.session
|
||||||
.invalid
|
.invalid
|
||||||
.notify(invalid_session)
|
.publish(invalid_session)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
// Starts our heartbeat
|
// Starts our heartbeat
|
||||||
|
|
|
@ -82,56 +82,3 @@ pub type ObservableObject = dyn Send + Sync + Any;
|
||||||
pub trait Updateable: 'static + Send + Sync {
|
pub trait Updateable: 'static + Send + Sync {
|
||||||
fn id(&self) -> Snowflake;
|
fn id(&self) -> Snowflake;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
|
|
||||||
/// an Observable. The Observer is notified when the Observable's data changes.
|
|
||||||
/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
|
|
||||||
/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Observer<T>: Sync + Send + std::fmt::Debug {
|
|
||||||
async fn update(&self, data: &T);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
|
|
||||||
/// change in the WebSocketEvent. GatewayEvents are observable.
|
|
||||||
#[derive(Default, Debug)]
|
|
||||||
pub struct GatewayEvent<T: WebSocketEvent> {
|
|
||||||
observers: Vec<Arc<dyn Observer<T>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: WebSocketEvent> GatewayEvent<T> {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
observers: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns true if the GatewayEvent is observed by at least one Observer.
|
|
||||||
pub fn is_observed(&self) -> bool {
|
|
||||||
!self.observers.is_empty()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Subscribes an Observer to the GatewayEvent.
|
|
||||||
pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
|
|
||||||
self.observers.push(observable);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Unsubscribes an Observer from the GatewayEvent.
|
|
||||||
pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
|
|
||||||
// .retain()'s closure retains only those elements of the vector, which have a different
|
|
||||||
// pointer value than observable.
|
|
||||||
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
|
|
||||||
// anddd there is no way to do that without using format
|
|
||||||
let to_remove = format!("{:?}", observable);
|
|
||||||
self.observers
|
|
||||||
.retain(|obs| format!("{:?}", obs) != to_remove);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Notifies the observers of the GatewayEvent.
|
|
||||||
pub(crate) async fn notify(&self, new_event_data: T) {
|
|
||||||
for observer in &self.observers {
|
|
||||||
observer.update(&new_event_data).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use log::{self, debug};
|
|
||||||
use reqwest::{Client, RequestBuilder, Response};
|
use reqwest::{Client, RequestBuilder, Response};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::from_str;
|
use serde_json::from_str;
|
||||||
|
|
|
@ -2,9 +2,10 @@
|
||||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
use pubserve::Publisher;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
errors::VoiceGatewayError,
|
errors::VoiceGatewayError,
|
||||||
gateway::GatewayEvent,
|
|
||||||
types::{
|
types::{
|
||||||
SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion,
|
SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion,
|
||||||
VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection,
|
VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection,
|
||||||
|
@ -14,15 +15,15 @@ use crate::{
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct VoiceEvents {
|
pub struct VoiceEvents {
|
||||||
pub voice_ready: GatewayEvent<VoiceReady>,
|
pub voice_ready: Publisher<VoiceReady>,
|
||||||
pub backend_version: GatewayEvent<VoiceBackendVersion>,
|
pub backend_version: Publisher<VoiceBackendVersion>,
|
||||||
pub session_description: GatewayEvent<SessionDescription>,
|
pub session_description: Publisher<SessionDescription>,
|
||||||
pub session_update: GatewayEvent<SessionUpdate>,
|
pub session_update: Publisher<SessionUpdate>,
|
||||||
pub speaking: GatewayEvent<Speaking>,
|
pub speaking: Publisher<Speaking>,
|
||||||
pub ssrc_definition: GatewayEvent<SsrcDefinition>,
|
pub ssrc_definition: Publisher<SsrcDefinition>,
|
||||||
pub client_disconnect: GatewayEvent<VoiceClientDisconnection>,
|
pub client_disconnect: Publisher<VoiceClientDisconnection>,
|
||||||
pub client_connect_flags: GatewayEvent<VoiceClientConnectFlags>,
|
pub client_connect_flags: Publisher<VoiceClientConnectFlags>,
|
||||||
pub client_connect_platform: GatewayEvent<VoiceClientConnectPlatform>,
|
pub client_connect_platform: Publisher<VoiceClientConnectPlatform>,
|
||||||
pub media_sink_wants: GatewayEvent<VoiceMediaSinkWants>,
|
pub media_sink_wants: Publisher<VoiceMediaSinkWants>,
|
||||||
pub error: GatewayEvent<VoiceGatewayError>,
|
pub error: Publisher<VoiceGatewayError>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
|
use pubserve::Publisher;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use futures_util::SinkExt;
|
use futures_util::SinkExt;
|
||||||
|
@ -16,7 +17,6 @@ use crate::gateway::Stream;
|
||||||
use crate::gateway::WebSocketBackend;
|
use crate::gateway::WebSocketBackend;
|
||||||
use crate::{
|
use crate::{
|
||||||
errors::VoiceGatewayError,
|
errors::VoiceGatewayError,
|
||||||
gateway::GatewayEvent,
|
|
||||||
types::{
|
types::{
|
||||||
VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION,
|
VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION,
|
||||||
VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT,
|
VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT,
|
||||||
|
@ -160,7 +160,7 @@ impl VoiceGateway {
|
||||||
/// (Called for every event in handle_message)
|
/// (Called for every event in handle_message)
|
||||||
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
||||||
data: &'a str,
|
data: &'a str,
|
||||||
event: &mut GatewayEvent<T>,
|
event: &mut Publisher<T>,
|
||||||
) -> Result<(), serde_json::Error> {
|
) -> Result<(), serde_json::Error> {
|
||||||
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ impl VoiceGateway {
|
||||||
return Err(data_deserialize_result.err().unwrap());
|
return Err(data_deserialize_result.err().unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
event.notify(data_deserialize_result.unwrap()).await;
|
event.publish(data_deserialize_result.unwrap()).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ impl VoiceGateway {
|
||||||
if let Some(error) = msg.error() {
|
if let Some(error) = msg.error() {
|
||||||
warn!("GW: Received error {:?}, connection will close..", error);
|
warn!("GW: Received error {:?}, connection will close..", error);
|
||||||
self.close().await;
|
self.close().await;
|
||||||
self.events.lock().await.error.notify(error).await;
|
self.events.lock().await.error.publish(error).await;
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
||||||
|
|
|
@ -3,23 +3,24 @@
|
||||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
use discortp::{rtcp::Rtcp, rtp::Rtp};
|
use discortp::{rtcp::Rtcp, rtp::Rtp};
|
||||||
|
use pubserve::Publisher;
|
||||||
|
|
||||||
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
|
use crate::types::WebSocketEvent;
|
||||||
|
|
||||||
impl WebSocketEvent for Rtp {}
|
impl WebSocketEvent for Rtp {}
|
||||||
impl WebSocketEvent for Rtcp {}
|
impl WebSocketEvent for Rtcp {}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct VoiceUDPEvents {
|
pub struct VoiceUDPEvents {
|
||||||
pub rtp: GatewayEvent<Rtp>,
|
pub rtp: Publisher<Rtp>,
|
||||||
pub rtcp: GatewayEvent<Rtcp>,
|
pub rtcp: Publisher<Rtcp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for VoiceUDPEvents {
|
impl Default for VoiceUDPEvents {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
rtp: GatewayEvent::new(),
|
rtp: Publisher::new(),
|
||||||
rtcp: GatewayEvent::new(),
|
rtcp: Publisher::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,7 +214,7 @@ impl UdpHandler {
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
.rtp
|
.rtp
|
||||||
.notify(rtp_with_decrypted_data)
|
.publish(rtp_with_decrypted_data)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
Demuxed::Rtcp(rtcp) => {
|
Demuxed::Rtcp(rtcp) => {
|
||||||
|
@ -251,7 +251,7 @@ impl UdpHandler {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.events.lock().await.rtcp.notify(rtcp_data).await;
|
self.events.lock().await.rtcp.publish(rtcp_data).await;
|
||||||
}
|
}
|
||||||
Demuxed::FailedParse(e) => {
|
Demuxed::FailedParse(e) => {
|
||||||
trace!("VUDP: Failed to parse packet: {:?}", e);
|
trace!("VUDP: Failed to parse packet: {:?}", e);
|
||||||
|
|
|
@ -14,6 +14,7 @@ use chorus::types::{
|
||||||
self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared,
|
self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared,
|
||||||
RoleCreateModifySchema, RoleObject,
|
RoleCreateModifySchema, RoleObject,
|
||||||
};
|
};
|
||||||
|
use pubserve::Subscriber;
|
||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
use wasm_bindgen_test::*;
|
use wasm_bindgen_test::*;
|
||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
@ -30,7 +31,9 @@ use wasmtimer::tokio::sleep;
|
||||||
async fn test_gateway_establish() {
|
async fn test_gateway_establish() {
|
||||||
let bundle = common::setup().await;
|
let bundle = common::setup().await;
|
||||||
|
|
||||||
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
|
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
common::teardown(bundle).await
|
common::teardown(bundle).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +43,7 @@ struct GatewayReadyObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Observer<GatewayReady> for GatewayReadyObserver {
|
impl Subscriber<GatewayReady> for GatewayReadyObserver {
|
||||||
async fn update(&self, _data: &GatewayReady) {
|
async fn update(&self, _data: &GatewayReady) {
|
||||||
self.channel.send(()).await.unwrap();
|
self.channel.send(()).await.unwrap();
|
||||||
}
|
}
|
||||||
|
@ -52,7 +55,9 @@ impl Observer<GatewayReady> for GatewayReadyObserver {
|
||||||
async fn test_gateway_authenticate() {
|
async fn test_gateway_authenticate() {
|
||||||
let bundle = common::setup().await;
|
let bundle = common::setup().await;
|
||||||
|
|
||||||
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
|
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);
|
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue