Compare commits
4 Commits
8ad1062e9e
...
ea62c35b3e
Author | SHA1 | Date |
---|---|---|
xystrive | ea62c35b3e | |
Flori | b0667a33fb | |
bitfl0wer | ebcb6b65e4 | |
bitfl0wer | 7554f90187 |
|
@ -233,6 +233,7 @@ dependencies = [
|
|||
"lazy_static",
|
||||
"log",
|
||||
"poem",
|
||||
"pubserve",
|
||||
"rand",
|
||||
"regex",
|
||||
"reqwest",
|
||||
|
@ -1594,6 +1595,15 @@ dependencies = [
|
|||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pubserve"
|
||||
version = "1.1.0-alpha.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1781b2a51798c98a381e839e61bc5ce6426bd89bb9c3f9142de2086a80591cd"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.36"
|
||||
|
|
|
@ -66,6 +66,7 @@ crypto_secretbox = { version = "0.1.1", optional = true }
|
|||
rand = "0.8.5"
|
||||
flate2 = { version = "1.0.30", optional = true }
|
||||
webpki-roots = "0.26.3"
|
||||
pubserve = { version = "1.1.0-alpha.1", features = ["async", "send"] }
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
rustls = "0.21.10"
|
||||
|
|
|
@ -17,9 +17,9 @@ use async_trait::async_trait;
|
|||
use chorus::gateway::{Gateway, GatewayOptions};
|
||||
use chorus::{
|
||||
self,
|
||||
gateway::Observer,
|
||||
types::{GatewayIdentifyPayload, GatewayReady},
|
||||
};
|
||||
use pubserve::Subscriber;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::{self};
|
||||
|
||||
|
@ -38,7 +38,7 @@ pub struct ExampleObserver {}
|
|||
// The Observer trait can be implemented for a struct for a given websocketevent to handle observing it
|
||||
// One struct can be an observer of multiple websocketevents, if needed
|
||||
#[async_trait]
|
||||
impl Observer<GatewayReady> for ExampleObserver {
|
||||
impl Subscriber<GatewayReady> for ExampleObserver {
|
||||
// After we subscribe to an event this function is called every time we receive it
|
||||
async fn update(&self, _data: &GatewayReady) {
|
||||
println!("Observed Ready!");
|
||||
|
@ -56,7 +56,9 @@ async fn main() {
|
|||
let options = GatewayOptions::default();
|
||||
|
||||
// Initiate the gateway connection
|
||||
let gateway = Gateway::spawn(gateway_websocket_url, options).await.unwrap();
|
||||
let gateway = Gateway::spawn(gateway_websocket_url, options)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create an instance of our observer
|
||||
let observer = ExampleObserver {};
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
use pubserve::Publisher;
|
||||
|
||||
use super::*;
|
||||
use crate::types;
|
||||
|
||||
|
@ -23,144 +25,144 @@ pub struct Events {
|
|||
pub call: Call,
|
||||
pub voice: Voice,
|
||||
pub webhooks: Webhooks,
|
||||
pub gateway_identify_payload: GatewayEvent<types::GatewayIdentifyPayload>,
|
||||
pub gateway_resume: GatewayEvent<types::GatewayResume>,
|
||||
pub error: GatewayEvent<GatewayError>,
|
||||
pub gateway_identify_payload: Publisher<types::GatewayIdentifyPayload>,
|
||||
pub gateway_resume: Publisher<types::GatewayResume>,
|
||||
pub error: Publisher<GatewayError>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Application {
|
||||
pub command_permissions_update: GatewayEvent<types::ApplicationCommandPermissionsUpdate>,
|
||||
pub command_permissions_update: Publisher<types::ApplicationCommandPermissionsUpdate>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct AutoModeration {
|
||||
pub rule_create: GatewayEvent<types::AutoModerationRuleCreate>,
|
||||
pub rule_update: GatewayEvent<types::AutoModerationRuleUpdate>,
|
||||
pub rule_delete: GatewayEvent<types::AutoModerationRuleDelete>,
|
||||
pub action_execution: GatewayEvent<types::AutoModerationActionExecution>,
|
||||
pub rule_create: Publisher<types::AutoModerationRuleCreate>,
|
||||
pub rule_update: Publisher<types::AutoModerationRuleUpdate>,
|
||||
pub rule_delete: Publisher<types::AutoModerationRuleDelete>,
|
||||
pub action_execution: Publisher<types::AutoModerationActionExecution>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Session {
|
||||
pub ready: GatewayEvent<types::GatewayReady>,
|
||||
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
|
||||
pub replace: GatewayEvent<types::SessionsReplace>,
|
||||
pub reconnect: GatewayEvent<types::GatewayReconnect>,
|
||||
pub invalid: GatewayEvent<types::GatewayInvalidSession>,
|
||||
pub ready: Publisher<types::GatewayReady>,
|
||||
pub ready_supplemental: Publisher<types::GatewayReadySupplemental>,
|
||||
pub replace: Publisher<types::SessionsReplace>,
|
||||
pub reconnect: Publisher<types::GatewayReconnect>,
|
||||
pub invalid: Publisher<types::GatewayInvalidSession>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct StageInstance {
|
||||
pub create: GatewayEvent<types::StageInstanceCreate>,
|
||||
pub update: GatewayEvent<types::StageInstanceUpdate>,
|
||||
pub delete: GatewayEvent<types::StageInstanceDelete>,
|
||||
pub create: Publisher<types::StageInstanceCreate>,
|
||||
pub update: Publisher<types::StageInstanceUpdate>,
|
||||
pub delete: Publisher<types::StageInstanceDelete>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Message {
|
||||
pub create: GatewayEvent<types::MessageCreate>,
|
||||
pub update: GatewayEvent<types::MessageUpdate>,
|
||||
pub delete: GatewayEvent<types::MessageDelete>,
|
||||
pub delete_bulk: GatewayEvent<types::MessageDeleteBulk>,
|
||||
pub reaction_add: GatewayEvent<types::MessageReactionAdd>,
|
||||
pub reaction_remove: GatewayEvent<types::MessageReactionRemove>,
|
||||
pub reaction_remove_all: GatewayEvent<types::MessageReactionRemoveAll>,
|
||||
pub reaction_remove_emoji: GatewayEvent<types::MessageReactionRemoveEmoji>,
|
||||
pub ack: GatewayEvent<types::MessageACK>,
|
||||
pub create: Publisher<types::MessageCreate>,
|
||||
pub update: Publisher<types::MessageUpdate>,
|
||||
pub delete: Publisher<types::MessageDelete>,
|
||||
pub delete_bulk: Publisher<types::MessageDeleteBulk>,
|
||||
pub reaction_add: Publisher<types::MessageReactionAdd>,
|
||||
pub reaction_remove: Publisher<types::MessageReactionRemove>,
|
||||
pub reaction_remove_all: Publisher<types::MessageReactionRemoveAll>,
|
||||
pub reaction_remove_emoji: Publisher<types::MessageReactionRemoveEmoji>,
|
||||
pub ack: Publisher<types::MessageACK>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct User {
|
||||
pub update: GatewayEvent<types::UserUpdate>,
|
||||
pub guild_settings_update: GatewayEvent<types::UserGuildSettingsUpdate>,
|
||||
pub presence_update: GatewayEvent<types::PresenceUpdate>,
|
||||
pub typing_start: GatewayEvent<types::TypingStartEvent>,
|
||||
pub update: Publisher<types::UserUpdate>,
|
||||
pub guild_settings_update: Publisher<types::UserGuildSettingsUpdate>,
|
||||
pub presence_update: Publisher<types::PresenceUpdate>,
|
||||
pub typing_start: Publisher<types::TypingStartEvent>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Relationship {
|
||||
pub add: GatewayEvent<types::RelationshipAdd>,
|
||||
pub remove: GatewayEvent<types::RelationshipRemove>,
|
||||
pub add: Publisher<types::RelationshipAdd>,
|
||||
pub remove: Publisher<types::RelationshipRemove>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Channel {
|
||||
pub create: GatewayEvent<types::ChannelCreate>,
|
||||
pub update: GatewayEvent<types::ChannelUpdate>,
|
||||
pub unread_update: GatewayEvent<types::ChannelUnreadUpdate>,
|
||||
pub delete: GatewayEvent<types::ChannelDelete>,
|
||||
pub pins_update: GatewayEvent<types::ChannelPinsUpdate>,
|
||||
pub create: Publisher<types::ChannelCreate>,
|
||||
pub update: Publisher<types::ChannelUpdate>,
|
||||
pub unread_update: Publisher<types::ChannelUnreadUpdate>,
|
||||
pub delete: Publisher<types::ChannelDelete>,
|
||||
pub pins_update: Publisher<types::ChannelPinsUpdate>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Thread {
|
||||
pub create: GatewayEvent<types::ThreadCreate>,
|
||||
pub update: GatewayEvent<types::ThreadUpdate>,
|
||||
pub delete: GatewayEvent<types::ThreadDelete>,
|
||||
pub list_sync: GatewayEvent<types::ThreadListSync>,
|
||||
pub member_update: GatewayEvent<types::ThreadMemberUpdate>,
|
||||
pub members_update: GatewayEvent<types::ThreadMembersUpdate>,
|
||||
pub create: Publisher<types::ThreadCreate>,
|
||||
pub update: Publisher<types::ThreadUpdate>,
|
||||
pub delete: Publisher<types::ThreadDelete>,
|
||||
pub list_sync: Publisher<types::ThreadListSync>,
|
||||
pub member_update: Publisher<types::ThreadMemberUpdate>,
|
||||
pub members_update: Publisher<types::ThreadMembersUpdate>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Guild {
|
||||
pub create: GatewayEvent<types::GuildCreate>,
|
||||
pub update: GatewayEvent<types::GuildUpdate>,
|
||||
pub delete: GatewayEvent<types::GuildDelete>,
|
||||
pub audit_log_entry_create: GatewayEvent<types::GuildAuditLogEntryCreate>,
|
||||
pub ban_add: GatewayEvent<types::GuildBanAdd>,
|
||||
pub ban_remove: GatewayEvent<types::GuildBanRemove>,
|
||||
pub emojis_update: GatewayEvent<types::GuildEmojisUpdate>,
|
||||
pub stickers_update: GatewayEvent<types::GuildStickersUpdate>,
|
||||
pub integrations_update: GatewayEvent<types::GuildIntegrationsUpdate>,
|
||||
pub member_add: GatewayEvent<types::GuildMemberAdd>,
|
||||
pub member_remove: GatewayEvent<types::GuildMemberRemove>,
|
||||
pub member_update: GatewayEvent<types::GuildMemberUpdate>,
|
||||
pub members_chunk: GatewayEvent<types::GuildMembersChunk>,
|
||||
pub role_create: GatewayEvent<types::GuildRoleCreate>,
|
||||
pub role_update: GatewayEvent<types::GuildRoleUpdate>,
|
||||
pub role_delete: GatewayEvent<types::GuildRoleDelete>,
|
||||
pub role_scheduled_event_create: GatewayEvent<types::GuildScheduledEventCreate>,
|
||||
pub role_scheduled_event_update: GatewayEvent<types::GuildScheduledEventUpdate>,
|
||||
pub role_scheduled_event_delete: GatewayEvent<types::GuildScheduledEventDelete>,
|
||||
pub role_scheduled_event_user_add: GatewayEvent<types::GuildScheduledEventUserAdd>,
|
||||
pub role_scheduled_event_user_remove: GatewayEvent<types::GuildScheduledEventUserRemove>,
|
||||
pub passive_update_v1: GatewayEvent<types::PassiveUpdateV1>,
|
||||
pub create: Publisher<types::GuildCreate>,
|
||||
pub update: Publisher<types::GuildUpdate>,
|
||||
pub delete: Publisher<types::GuildDelete>,
|
||||
pub audit_log_entry_create: Publisher<types::GuildAuditLogEntryCreate>,
|
||||
pub ban_add: Publisher<types::GuildBanAdd>,
|
||||
pub ban_remove: Publisher<types::GuildBanRemove>,
|
||||
pub emojis_update: Publisher<types::GuildEmojisUpdate>,
|
||||
pub stickers_update: Publisher<types::GuildStickersUpdate>,
|
||||
pub integrations_update: Publisher<types::GuildIntegrationsUpdate>,
|
||||
pub member_add: Publisher<types::GuildMemberAdd>,
|
||||
pub member_remove: Publisher<types::GuildMemberRemove>,
|
||||
pub member_update: Publisher<types::GuildMemberUpdate>,
|
||||
pub members_chunk: Publisher<types::GuildMembersChunk>,
|
||||
pub role_create: Publisher<types::GuildRoleCreate>,
|
||||
pub role_update: Publisher<types::GuildRoleUpdate>,
|
||||
pub role_delete: Publisher<types::GuildRoleDelete>,
|
||||
pub role_scheduled_event_create: Publisher<types::GuildScheduledEventCreate>,
|
||||
pub role_scheduled_event_update: Publisher<types::GuildScheduledEventUpdate>,
|
||||
pub role_scheduled_event_delete: Publisher<types::GuildScheduledEventDelete>,
|
||||
pub role_scheduled_event_user_add: Publisher<types::GuildScheduledEventUserAdd>,
|
||||
pub role_scheduled_event_user_remove: Publisher<types::GuildScheduledEventUserRemove>,
|
||||
pub passive_update_v1: Publisher<types::PassiveUpdateV1>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Invite {
|
||||
pub create: GatewayEvent<types::InviteCreate>,
|
||||
pub delete: GatewayEvent<types::InviteDelete>,
|
||||
pub create: Publisher<types::InviteCreate>,
|
||||
pub delete: Publisher<types::InviteDelete>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Integration {
|
||||
pub create: GatewayEvent<types::IntegrationCreate>,
|
||||
pub update: GatewayEvent<types::IntegrationUpdate>,
|
||||
pub delete: GatewayEvent<types::IntegrationDelete>,
|
||||
pub create: Publisher<types::IntegrationCreate>,
|
||||
pub update: Publisher<types::IntegrationUpdate>,
|
||||
pub delete: Publisher<types::IntegrationDelete>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Interaction {
|
||||
pub create: GatewayEvent<types::InteractionCreate>,
|
||||
pub create: Publisher<types::InteractionCreate>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Call {
|
||||
pub create: GatewayEvent<types::CallCreate>,
|
||||
pub update: GatewayEvent<types::CallUpdate>,
|
||||
pub delete: GatewayEvent<types::CallDelete>,
|
||||
pub create: Publisher<types::CallCreate>,
|
||||
pub update: Publisher<types::CallUpdate>,
|
||||
pub delete: Publisher<types::CallDelete>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Voice {
|
||||
pub state_update: GatewayEvent<types::VoiceStateUpdate>,
|
||||
pub server_update: GatewayEvent<types::VoiceServerUpdate>,
|
||||
pub state_update: Publisher<types::VoiceStateUpdate>,
|
||||
pub server_update: Publisher<types::VoiceServerUpdate>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct Webhooks {
|
||||
pub update: GatewayEvent<types::WebhooksUpdate>,
|
||||
pub update: Publisher<types::WebhooksUpdate>,
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ use std::time::Duration;
|
|||
use flate2::Decompress;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use log::*;
|
||||
use pubserve::Publisher;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::task;
|
||||
|
||||
|
@ -197,7 +198,7 @@ impl Gateway {
|
|||
#[allow(dead_code)] // TODO: Remove this allow annotation
|
||||
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
||||
data: &'a str,
|
||||
event: &mut GatewayEvent<T>,
|
||||
event: &mut Publisher<T>,
|
||||
) -> Result<(), serde_json::Error> {
|
||||
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
||||
|
||||
|
@ -205,7 +206,7 @@ impl Gateway {
|
|||
return Err(data_deserialize_result.err().unwrap());
|
||||
}
|
||||
|
||||
event.notify(data_deserialize_result.unwrap()).await;
|
||||
event.publish(data_deserialize_result.unwrap()).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -253,7 +254,7 @@ impl Gateway {
|
|||
if let Some(error) = msg.error() {
|
||||
warn!("GW: Received error {:?}, connection will close..", error);
|
||||
self.close().await;
|
||||
self.events.lock().await.error.notify(error).await;
|
||||
self.events.lock().await.error.publish(error).await;
|
||||
} else {
|
||||
warn!(
|
||||
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
||||
|
@ -292,7 +293,7 @@ impl Gateway {
|
|||
let id = if message.id().is_some() {
|
||||
message.id().unwrap()
|
||||
} else {
|
||||
event.notify(message).await;
|
||||
event.publish(message).await;
|
||||
return;
|
||||
};
|
||||
if let Some(to_update) = store.get(&id) {
|
||||
|
@ -314,7 +315,7 @@ impl Gateway {
|
|||
}
|
||||
}
|
||||
)?
|
||||
event.notify(message).await;
|
||||
event.publish(message).await;
|
||||
}
|
||||
}
|
||||
},)*
|
||||
|
@ -329,7 +330,7 @@ impl Gateway {
|
|||
return;
|
||||
}
|
||||
Ok(sessions) => {
|
||||
self.events.lock().await.session.replace.notify(
|
||||
self.events.lock().await.session.replace.publish(
|
||||
types::SessionsReplace {sessions}
|
||||
).await;
|
||||
}
|
||||
|
@ -446,7 +447,7 @@ impl Gateway {
|
|||
.await
|
||||
.session
|
||||
.reconnect
|
||||
.notify(reconnect)
|
||||
.publish(reconnect)
|
||||
.await;
|
||||
}
|
||||
GATEWAY_INVALID_SESSION => {
|
||||
|
@ -471,7 +472,7 @@ impl Gateway {
|
|||
.await
|
||||
.session
|
||||
.invalid
|
||||
.notify(invalid_session)
|
||||
.publish(invalid_session)
|
||||
.await;
|
||||
}
|
||||
// Starts our heartbeat
|
||||
|
|
|
@ -82,56 +82,3 @@ pub type ObservableObject = dyn Send + Sync + Any;
|
|||
pub trait Updateable: 'static + Send + Sync {
|
||||
fn id(&self) -> Snowflake;
|
||||
}
|
||||
|
||||
/// Trait which defines the behavior of an Observer. An Observer is an object which is subscribed to
|
||||
/// an Observable. The Observer is notified when the Observable's data changes.
|
||||
/// In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
|
||||
/// Note that `Debug` is used to tell `Observer`s apart when unsubscribing.
|
||||
#[async_trait]
|
||||
pub trait Observer<T>: Sync + Send + std::fmt::Debug {
|
||||
async fn update(&self, data: &T);
|
||||
}
|
||||
|
||||
/// GatewayEvent is a wrapper around a WebSocketEvent. It is used to notify the observers of a
|
||||
/// change in the WebSocketEvent. GatewayEvents are observable.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct GatewayEvent<T: WebSocketEvent> {
|
||||
observers: Vec<Arc<dyn Observer<T>>>,
|
||||
}
|
||||
|
||||
impl<T: WebSocketEvent> GatewayEvent<T> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
observers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the GatewayEvent is observed by at least one Observer.
|
||||
pub fn is_observed(&self) -> bool {
|
||||
!self.observers.is_empty()
|
||||
}
|
||||
|
||||
/// Subscribes an Observer to the GatewayEvent.
|
||||
pub fn subscribe(&mut self, observable: Arc<dyn Observer<T>>) {
|
||||
self.observers.push(observable);
|
||||
}
|
||||
|
||||
/// Unsubscribes an Observer from the GatewayEvent.
|
||||
pub fn unsubscribe(&mut self, observable: &dyn Observer<T>) {
|
||||
// .retain()'s closure retains only those elements of the vector, which have a different
|
||||
// pointer value than observable.
|
||||
// The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
|
||||
// anddd there is no way to do that without using format
|
||||
let to_remove = format!("{:?}", observable);
|
||||
self.observers
|
||||
.retain(|obs| format!("{:?}", obs) != to_remove);
|
||||
}
|
||||
|
||||
/// Notifies the observers of the GatewayEvent.
|
||||
pub(crate) async fn notify(&self, new_event_data: T) {
|
||||
for observer in &self.observers {
|
||||
observer.update(&new_event_data).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use log::{self, debug};
|
||||
use reqwest::{Client, RequestBuilder, Response};
|
||||
use serde::Deserialize;
|
||||
use serde_json::from_str;
|
||||
|
|
|
@ -2,9 +2,10 @@
|
|||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
use pubserve::Publisher;
|
||||
|
||||
use crate::{
|
||||
errors::VoiceGatewayError,
|
||||
gateway::GatewayEvent,
|
||||
types::{
|
||||
SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion,
|
||||
VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection,
|
||||
|
@ -14,15 +15,15 @@ use crate::{
|
|||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct VoiceEvents {
|
||||
pub voice_ready: GatewayEvent<VoiceReady>,
|
||||
pub backend_version: GatewayEvent<VoiceBackendVersion>,
|
||||
pub session_description: GatewayEvent<SessionDescription>,
|
||||
pub session_update: GatewayEvent<SessionUpdate>,
|
||||
pub speaking: GatewayEvent<Speaking>,
|
||||
pub ssrc_definition: GatewayEvent<SsrcDefinition>,
|
||||
pub client_disconnect: GatewayEvent<VoiceClientDisconnection>,
|
||||
pub client_connect_flags: GatewayEvent<VoiceClientConnectFlags>,
|
||||
pub client_connect_platform: GatewayEvent<VoiceClientConnectPlatform>,
|
||||
pub media_sink_wants: GatewayEvent<VoiceMediaSinkWants>,
|
||||
pub error: GatewayEvent<VoiceGatewayError>,
|
||||
pub voice_ready: Publisher<VoiceReady>,
|
||||
pub backend_version: Publisher<VoiceBackendVersion>,
|
||||
pub session_description: Publisher<SessionDescription>,
|
||||
pub session_update: Publisher<SessionUpdate>,
|
||||
pub speaking: Publisher<Speaking>,
|
||||
pub ssrc_definition: Publisher<SsrcDefinition>,
|
||||
pub client_disconnect: Publisher<VoiceClientDisconnection>,
|
||||
pub client_connect_flags: Publisher<VoiceClientConnectFlags>,
|
||||
pub client_connect_platform: Publisher<VoiceClientConnectPlatform>,
|
||||
pub media_sink_wants: Publisher<VoiceMediaSinkWants>,
|
||||
pub error: Publisher<VoiceGatewayError>,
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::{sync::Arc, time::Duration};
|
|||
|
||||
use log::*;
|
||||
|
||||
use pubserve::Publisher;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use futures_util::SinkExt;
|
||||
|
@ -16,7 +17,6 @@ use crate::gateway::Stream;
|
|||
use crate::gateway::WebSocketBackend;
|
||||
use crate::{
|
||||
errors::VoiceGatewayError,
|
||||
gateway::GatewayEvent,
|
||||
types::{
|
||||
VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION,
|
||||
VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT,
|
||||
|
@ -160,7 +160,7 @@ impl VoiceGateway {
|
|||
/// (Called for every event in handle_message)
|
||||
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
|
||||
data: &'a str,
|
||||
event: &mut GatewayEvent<T>,
|
||||
event: &mut Publisher<T>,
|
||||
) -> Result<(), serde_json::Error> {
|
||||
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
|
||||
|
||||
|
@ -168,7 +168,7 @@ impl VoiceGateway {
|
|||
return Err(data_deserialize_result.err().unwrap());
|
||||
}
|
||||
|
||||
event.notify(data_deserialize_result.unwrap()).await;
|
||||
event.publish(data_deserialize_result.unwrap()).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -182,7 +182,7 @@ impl VoiceGateway {
|
|||
if let Some(error) = msg.error() {
|
||||
warn!("GW: Received error {:?}, connection will close..", error);
|
||||
self.close().await;
|
||||
self.events.lock().await.error.notify(error).await;
|
||||
self.events.lock().await.error.publish(error).await;
|
||||
} else {
|
||||
warn!(
|
||||
"Message unrecognised: {:?}, please open an issue on the chorus github",
|
||||
|
|
|
@ -3,23 +3,24 @@
|
|||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
use discortp::{rtcp::Rtcp, rtp::Rtp};
|
||||
use pubserve::Publisher;
|
||||
|
||||
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
|
||||
use crate::types::WebSocketEvent;
|
||||
|
||||
impl WebSocketEvent for Rtp {}
|
||||
impl WebSocketEvent for Rtcp {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VoiceUDPEvents {
|
||||
pub rtp: GatewayEvent<Rtp>,
|
||||
pub rtcp: GatewayEvent<Rtcp>,
|
||||
pub rtp: Publisher<Rtp>,
|
||||
pub rtcp: Publisher<Rtcp>,
|
||||
}
|
||||
|
||||
impl Default for VoiceUDPEvents {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rtp: GatewayEvent::new(),
|
||||
rtcp: GatewayEvent::new(),
|
||||
rtp: Publisher::new(),
|
||||
rtcp: Publisher::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,7 +214,7 @@ impl UdpHandler {
|
|||
.lock()
|
||||
.await
|
||||
.rtp
|
||||
.notify(rtp_with_decrypted_data)
|
||||
.publish(rtp_with_decrypted_data)
|
||||
.await;
|
||||
}
|
||||
Demuxed::Rtcp(rtcp) => {
|
||||
|
@ -251,7 +251,7 @@ impl UdpHandler {
|
|||
}
|
||||
};
|
||||
|
||||
self.events.lock().await.rtcp.notify(rtcp_data).await;
|
||||
self.events.lock().await.rtcp.publish(rtcp_data).await;
|
||||
}
|
||||
Demuxed::FailedParse(e) => {
|
||||
trace!("VUDP: Failed to parse packet: {:?}", e);
|
||||
|
|
|
@ -14,6 +14,7 @@ use chorus::types::{
|
|||
self, Channel, ChannelCreateSchema, ChannelModifySchema, GatewayReady, IntoShared,
|
||||
RoleCreateModifySchema, RoleObject,
|
||||
};
|
||||
use pubserve::Subscriber;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_bindgen_test::*;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
|
@ -30,7 +31,9 @@ use wasmtimer::tokio::sleep;
|
|||
async fn test_gateway_establish() {
|
||||
let bundle = common::setup().await;
|
||||
|
||||
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
|
||||
let _: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
common::teardown(bundle).await
|
||||
}
|
||||
|
||||
|
@ -40,7 +43,7 @@ struct GatewayReadyObserver {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Observer<GatewayReady> for GatewayReadyObserver {
|
||||
impl Subscriber<GatewayReady> for GatewayReadyObserver {
|
||||
async fn update(&self, _data: &GatewayReady) {
|
||||
self.channel.send(()).await.unwrap();
|
||||
}
|
||||
|
@ -52,7 +55,9 @@ impl Observer<GatewayReady> for GatewayReadyObserver {
|
|||
async fn test_gateway_authenticate() {
|
||||
let bundle = common::setup().await;
|
||||
|
||||
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default()).await.unwrap();
|
||||
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone(), GatewayOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
|
|
Loading…
Reference in New Issue