Attempted reimpl

This commit is contained in:
kozabrada123 2023-05-11 22:47:31 +02:00
parent 9962dfdf81
commit 4008d36763
4 changed files with 270 additions and 214 deletions

View File

@ -8,7 +8,7 @@ pub mod login {
use crate::errors::InstanceServerError; use crate::errors::InstanceServerError;
use crate::instance::Instance; use crate::instance::Instance;
impl<'a> Instance<'a> { impl Instance {
pub async fn login_account( pub async fn login_account(
&mut self, &mut self,
login_schema: &LoginSchema, login_schema: &LoginSchema,

View File

@ -8,7 +8,7 @@ pub mod register {
instance::Instance, instance::Instance,
}; };
impl<'a> Instance<'a> { impl Instance {
/** /**
Registers a new user on the Spacebar server. Registers a new user on the Spacebar server.
# Arguments # Arguments

View File

@ -18,200 +18,51 @@ use tokio::time::Instant;
use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::{WebSocketStream, Connector, connect_async_tls_with_config}; use tokio_tungstenite::{WebSocketStream, Connector, connect_async_tls_with_config};
#[derive(Debug)]
/** /**
Represents a Gateway connection. A Gateway connection will create observable Represents a handle to a Gateway connection. A Gateway connection will create observable
[`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently [`GatewayEvents`](GatewayEvent), which you can subscribe to. Gateway events include all currently
implemented [Types] with the trait [`WebSocketEvent`] implemented [Types] with the trait [`WebSocketEvent`]
Using this handle you can also send Gateway Events directly.
*/ */
pub struct Gateway<'a> { pub struct GatewayHandle {
pub url: String, pub url: String,
pub events: Events<'a>, pub events: Arc<Mutex<Events>>,
websocket: WebSocketConnection, pub websocket_tx: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>,
heartbeat_handler: Option<HeartbeatHandler>
} }
impl<'a> Gateway<'a> { impl GatewayHandle {
pub async fn new(
websocket_url: String,
) -> Result<Gateway<'a>, tokio_tungstenite::tungstenite::Error> {
return Ok(Gateway {
url: websocket_url.clone(),
events: Events::default(),
websocket: WebSocketConnection::new(websocket_url).await,
heartbeat_handler: None,
});
}
/// This function reads all messages from the gateway's websocket and updates its events along with the events' observers
pub async fn update_events(&mut self) {
while let Ok(msg) = self.websocket.rx.lock().await.try_recv() {
if msg.to_string() == String::new() {
continue;
}
let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap();
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes
match gateway_payload.op {
// Dispatch
// An event was dispatched, we need to look at the gateway event name t
0 => {
let gateway_payload_t = gateway_payload.t.unwrap();
println!("GW: Received {}..", gateway_payload_t);
// See https://discord.com/developers/docs/topics/gateway-events#receive-events
match gateway_payload_t.as_str() {
"READY" => {
let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
}
"APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {}
"AUTO_MODERATION_RULE_CREATE" => {}
"AUTO_MODERATION_RULE_UPDATE" => {}
"AUTO_MODERATION_RULE_DELETE" => {}
"AUTO_MODERATION_ACTION_EXECUTION" => {}
"CHANNEL_CREATE" => {}
"CHANNEL_UPDATE" => {}
"CHANNEL_DELETE" => {}
"CHANNEL_PINS_UPDATE" => {}
"THREAD_CREATE" => {}
"THREAD_UPDATE" => {}
"THREAD_DELETE" => {}
"THREAD_LIST_SYNC" => {}
"THREAD_MEMBER_UPDATE" => {}
"THREAD_MEMBERS_UPDATE" => {}
"GUILD_CREATE" => {}
"GUILD_UPDATE" => {}
"GUILD_DELETE" => {}
"GUILD_AUDIT_LOG_ENTRY_CREATE" => {}
"GUILD_BAN_ADD" => {}
"GUILD_BAN_REMOVE" => {}
"GUILD_EMOJIS_UPDATE" => {}
"GUILD_STICKERS_UPDATE" => {}
"GUILD_INTEGRATIONS_UPDATE" => {}
"GUILD_MEMBER_ADD" => {}
"GUILD_MEMBER_REMOVE" => {}
"GUILD_MEMBER_UPDATE" => {}
"GUILD_MEMBERS_CHUNK" => {}
"GUILD_ROLE_CREATE" => {}
"GUILD_ROLE_UPDATE" => {}
"GUILD_ROLE_DELETE" => {}
"GUILD_SCHEDULED_EVENT_CREATE" => {}
"GUILD_SCHEDULED_EVENT_UPDATE" => {}
"GUILD_SCHEDULED_EVENT_DELETE" => {}
"GUILD_SCHEDULED_EVENT_USER_ADD" => {}
"GUILD_SCHEDULED_EVENT_USER_REMOVE" => {}
"INTEGRATION_CREATE" => {}
"INTEGRATION_UPDATE" => {}
"INTEGRATION_DELETE" => {}
"INTERACTION_CREATE" => {}
"INVITE_CREATE" => {}
"INVITE_DELETE" => {}
"MESSAGE_CREATE" => {
let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.create.update_data(new_data);
}
"MESSAGE_UPDATE" => {
let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.update.update_data(new_data);
}
"MESSAGE_DELETE" => {
let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.delete.update_data(new_data);
}
"MESSAGE_DELETE_BULK" => {
let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.delete_bulk.update_data(new_data);
}
"MESSAGE_REACTION_ADD" => {
let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.reaction_add.update_data(new_data);
}
"MESSAGE_REACTION_REMOVE" => {
let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.reaction_remove.update_data(new_data);
}
"MESSAGE_REACTION_REMOVE_ALL" => {
let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.reaction_remove_all.update_data(new_data);
}
"MESSAGE_REACTION_REMOVE_EMOJI" => {
let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.message.reaction_remove_emoji.update_data(new_data);
}
"PRESENCE_UPDATE" => {
let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.user.presence_update.update_data(new_data);
}
"STAGE_INSTANCE_CREATE" => {}
"STAGE_INSTANCE_UPDATE" => {}
"STAGE_INSTANCE_DELETE" => {}
// Not documented in discord docs, I assume this isnt for bots / apps but is for users?
"SESSIONS_REPLACE" => {}
"TYPING_START" => {
let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.user.typing_start_event.update_data(new_data);
}
"USER_UPDATE" => {}
"VOICE_STATE_UPDATE" => {}
"VOICE_SERVER_UPDATE" => {}
"WEBHOOKS_UPDATE" => {}
_ => {panic!("Invalid gateway event ({})", &gateway_payload_t)}
}
}
// Heartbeat
// We received a heartbeat from the server
1 => {}
// Reconnect
7 => {todo!()}
// Invalid Session
9 => {todo!()}
// Hello
// Starts our heartbeat
10 => {
println!("GW: Received Hello");
let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket.tx.clone()));
}
// Heartbeat ACK
11 => {
println!("GW: Received Heartbeat ACK");
}
2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)}
_ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)}
}
// If we have an active heartbeat thread and we received a seq number we should let it know
if gateway_payload.s.is_some() {
if self.heartbeat_handler.is_some() {
let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() };
self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap();
}
}
}
}
/// Sends json to the gateway with an opcode /// Sends json to the gateway with an opcode
async fn send_json_event(&self, op: u8, to_send: serde_json::Value) { async fn send_json_event(&self, op: u8, to_send: serde_json::Value) {
println!("1");
let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None }; let gateway_payload = GatewayPayload { op, d: Some(to_send), s: None, t: None };
println!("2");
let payload_json = serde_json::to_string(&gateway_payload).unwrap(); let payload_json = serde_json::to_string(&gateway_payload).unwrap();
println!("3");
let message = tokio_tungstenite::tungstenite::Message::text(payload_json); let message = tokio_tungstenite::tungstenite::Message::text(payload_json);
self.websocket.tx.lock().await.send(message).await.unwrap(); println!("4");
self.websocket_tx.lock().await.send(message).await.unwrap();
println!("5");
} }
/// Sends an identify event to the gateway /// Sends an identify event to the gateway
pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) { pub async fn send_identify(&self, to_send: GatewayIdentifyPayload) {
println!("0.1");
let to_send_value = serde_json::to_value(&to_send).unwrap(); let to_send_value = serde_json::to_value(&to_send).unwrap();
println!("0.2");
println!("GW: Sending Identify.."); println!("GW: Sending Identify..");
self.send_json_event(2, to_send_value).await; self.send_json_event(2, to_send_value).await;
@ -236,6 +87,209 @@ impl<'a> Gateway<'a> {
} }
} }
pub struct Gateway {
pub events: Arc<Mutex<Events>>,
heartbeat_handler: Option<HeartbeatHandler>,
pub websocket_tx: Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Message>>>
}
impl<'a> Gateway {
pub async fn new(
websocket_url: String,
) -> Result<GatewayHandle, tokio_tungstenite::tungstenite::Error> {
let (ws_stream, _) = match connect_async_tls_with_config(
&websocket_url,
None,
Some(Connector::NativeTls(
TlsConnector::builder().build().unwrap(),
)),
)
.await
{
Ok(ws_stream) => ws_stream,
Err(e) => panic!("{:?}", e),
};
let (ws_tx, mut ws_rx) = ws_stream.split();
let shared_tx = Arc::new(Mutex::new(ws_tx));
let mut gateway = Gateway { events: Arc::new(Mutex::new(Events::default())), heartbeat_handler: None, websocket_tx: shared_tx.clone() };
let shared_events = gateway.events.clone();
task::spawn(async move {
loop {
println!("Waiting for next event..");
let msg = ws_rx.next().await;
println!("Received event or sth");
if msg.as_ref().is_some() {
let msg_unwrapped = msg.unwrap().unwrap();
gateway.handle_event(msg_unwrapped).await;
println!("Handled the event");
};
}
});
return Ok(GatewayHandle {
url: websocket_url.clone(),
events: shared_events,
websocket_tx: shared_tx.clone(),
});
}
/// This handles a message as a websocket event and updates its events along with the events' observers
pub async fn handle_event(&mut self, msg: tokio_tungstenite::tungstenite::Message) {
if msg.to_string() == String::new() {
return;
}
let gateway_payload: GatewayPayload = serde_json::from_str(msg.to_text().unwrap()).unwrap();
// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes
match gateway_payload.op {
// Dispatch
// An event was dispatched, we need to look at the gateway event name t
0 => {
let gateway_payload_t = gateway_payload.t.unwrap();
println!("GW: Received {}..", gateway_payload_t);
// See https://discord.com/developers/docs/topics/gateway-events#receive-events
match gateway_payload_t.as_str() {
"READY" => {
let data: GatewayReady = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
}
"APPLICATION_COMMAND_PERMISSIONS_UPDATE" => {}
"AUTO_MODERATION_RULE_CREATE" => {}
"AUTO_MODERATION_RULE_UPDATE" => {}
"AUTO_MODERATION_RULE_DELETE" => {}
"AUTO_MODERATION_ACTION_EXECUTION" => {}
"CHANNEL_CREATE" => {}
"CHANNEL_UPDATE" => {}
"CHANNEL_DELETE" => {}
"CHANNEL_PINS_UPDATE" => {}
"THREAD_CREATE" => {}
"THREAD_UPDATE" => {}
"THREAD_DELETE" => {}
"THREAD_LIST_SYNC" => {}
"THREAD_MEMBER_UPDATE" => {}
"THREAD_MEMBERS_UPDATE" => {}
"GUILD_CREATE" => {}
"GUILD_UPDATE" => {}
"GUILD_DELETE" => {}
"GUILD_AUDIT_LOG_ENTRY_CREATE" => {}
"GUILD_BAN_ADD" => {}
"GUILD_BAN_REMOVE" => {}
"GUILD_EMOJIS_UPDATE" => {}
"GUILD_STICKERS_UPDATE" => {}
"GUILD_INTEGRATIONS_UPDATE" => {}
"GUILD_MEMBER_ADD" => {}
"GUILD_MEMBER_REMOVE" => {}
"GUILD_MEMBER_UPDATE" => {}
"GUILD_MEMBERS_CHUNK" => {}
"GUILD_ROLE_CREATE" => {}
"GUILD_ROLE_UPDATE" => {}
"GUILD_ROLE_DELETE" => {}
"GUILD_SCHEDULED_EVENT_CREATE" => {}
"GUILD_SCHEDULED_EVENT_UPDATE" => {}
"GUILD_SCHEDULED_EVENT_DELETE" => {}
"GUILD_SCHEDULED_EVENT_USER_ADD" => {}
"GUILD_SCHEDULED_EVENT_USER_REMOVE" => {}
"INTEGRATION_CREATE" => {}
"INTEGRATION_UPDATE" => {}
"INTEGRATION_DELETE" => {}
"INTERACTION_CREATE" => {}
"INVITE_CREATE" => {}
"INVITE_DELETE" => {}
"MESSAGE_CREATE" => {
let new_data: MessageCreate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.create.update_data(new_data).await;
}
"MESSAGE_UPDATE" => {
let new_data: MessageUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.update.update_data(new_data).await;
}
"MESSAGE_DELETE" => {
let new_data: MessageDelete = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.delete.update_data(new_data).await;
}
"MESSAGE_DELETE_BULK" => {
let new_data: MessageDeleteBulk = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.delete_bulk.update_data(new_data).await;
}
"MESSAGE_REACTION_ADD" => {
let new_data: MessageReactionAdd = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.reaction_add.update_data(new_data).await;
}
"MESSAGE_REACTION_REMOVE" => {
let new_data: MessageReactionRemove = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.reaction_remove.update_data(new_data).await;
}
"MESSAGE_REACTION_REMOVE_ALL" => {
let new_data: MessageReactionRemoveAll = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.reaction_remove_all.update_data(new_data).await;
}
"MESSAGE_REACTION_REMOVE_EMOJI" => {
let new_data: MessageReactionRemoveEmoji= serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.message.reaction_remove_emoji.update_data(new_data).await;
}
"PRESENCE_UPDATE" => {
let new_data: PresenceUpdate = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.user.presence_update.update_data(new_data).await;
}
"STAGE_INSTANCE_CREATE" => {}
"STAGE_INSTANCE_UPDATE" => {}
"STAGE_INSTANCE_DELETE" => {}
// Not documented in discord docs, I assume this isnt for bots / apps but is for users?
"SESSIONS_REPLACE" => {}
"TYPING_START" => {
let new_data: TypingStartEvent = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.events.lock().await.user.typing_start_event.update_data(new_data).await;
}
"USER_UPDATE" => {}
"VOICE_STATE_UPDATE" => {}
"VOICE_SERVER_UPDATE" => {}
"WEBHOOKS_UPDATE" => {}
_ => {panic!("Invalid gateway event ({})", &gateway_payload_t)}
}
}
// Heartbeat
// We received a heartbeat from the server
1 => {}
// Reconnect
7 => {todo!()}
// Invalid Session
9 => {todo!()}
// Hello
// Starts our heartbeat
10 => {
println!("GW: Received Hello");
let gateway_hello: HelloData = serde_json::from_value(gateway_payload.d.unwrap()).unwrap();
self.heartbeat_handler = Some(HeartbeatHandler::new(gateway_hello.heartbeat_interval, self.websocket_tx.clone()));
}
// Heartbeat ACK
11 => {
println!("GW: Received Heartbeat ACK");
}
2 | 3 | 4 | 6 | 8 => {panic!("Received Gateway op code that's meant to be sent, not received ({})", gateway_payload.op)}
_ => {panic!("Received Invalid Gateway op code ({})", gateway_payload.op)}
}
// If we have an active heartbeat thread and we received a seq number we should let it know
if gateway_payload.s.is_some() {
if self.heartbeat_handler.is_some() {
let heartbeat_communication = HeartbeatThreadCommunication { op: gateway_payload.op, d: gateway_payload.s.unwrap() };
self.heartbeat_handler.as_mut().unwrap().tx.send(heartbeat_communication).await.unwrap();
}
}
}
}
/** /**
Handles sending heartbeats to the gateway in another thread Handles sending heartbeats to the gateway in another thread
*/ */
@ -357,7 +411,7 @@ Trait which defines the behaviour of an Observer. An Observer is an object which
an Observable. The Observer is notified when the Observable's data changes. an Observable. The Observer is notified when the Observable's data changes.
In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent. In this case, the Observable is a [`GatewayEvent`], which is a wrapper around a WebSocketEvent.
*/ */
pub trait Observer<T: WebSocketEvent> { pub trait Observer<T: WebSocketEvent>: std::fmt::Debug {
fn update(&self, data: &T); fn update(&self, data: &T);
} }
@ -365,14 +419,14 @@ pub trait Observer<T: WebSocketEvent> {
change in the WebSocketEvent. GatewayEvents are observable. change in the WebSocketEvent. GatewayEvents are observable.
*/ */
#[derive(Default)] #[derive(Default, Debug)]
pub struct GatewayEvent<'a, T: WebSocketEvent> { pub struct GatewayEvent<T: WebSocketEvent> {
observers: Vec<&'a dyn Observer<T>>, observers: Vec<Arc<Mutex<dyn Observer<T> + Sync + Send>>>,
pub event_data: T, pub event_data: T,
pub is_observed: bool, pub is_observed: bool,
} }
impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> { impl<T: WebSocketEvent> GatewayEvent<T> {
fn new(event_data: T) -> Self { fn new(event_data: T) -> Self {
Self { Self {
is_observed: false, is_observed: false,
@ -395,7 +449,7 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> {
Returns an error if the GatewayEvent is already observed. Returns an error if the GatewayEvent is already observed.
Error type: [`ObserverError::AlreadySubscribedError`] Error type: [`ObserverError::AlreadySubscribedError`]
*/ */
pub fn subscribe(&mut self, observable: &'a dyn Observer<T>) -> Option<ObserverError> { pub fn subscribe(&mut self, observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>) -> Option<ObserverError> {
if self.is_observed { if self.is_observed {
return Some(ObserverError::AlreadySubscribedError); return Some(ObserverError::AlreadySubscribedError);
} }
@ -407,57 +461,59 @@ impl<'a, T: WebSocketEvent> GatewayEvent<'a, T> {
/** /**
Unsubscribes an Observer from the GatewayEvent. Unsubscribes an Observer from the GatewayEvent.
*/ */
pub fn unsubscribe(&mut self, observable: &'a dyn Observer<T>) { pub fn unsubscribe(&mut self, observable: Arc<Mutex<dyn Observer<T> + Sync + Send>>) {
// .retain()'s closure retains only those elements of the vector, which have a different // .retain()'s closure retains only those elements of the vector, which have a different
// pointer value than observable. // pointer value than observable.
self.observers.retain(|obs| !std::ptr::eq(*obs, observable)); // The usage of the debug format to compare the generic T of observers is quite stupid, but the only thing to compare between them is T and if T == T they are the same
// anddd there is no way to do that without using format
self.observers.retain(|obs| !(format!("{:?}", obs) == format!("{:?}", &observable)));
self.is_observed = !self.observers.is_empty(); self.is_observed = !self.observers.is_empty();
} }
/** /**
Updates the GatewayEvent's data and notifies the observers. Updates the GatewayEvent's data and notifies the observers.
*/ */
fn update_data(&mut self, new_event_data: T) { async fn update_data(&mut self, new_event_data: T) {
self.event_data = new_event_data; self.event_data = new_event_data;
self.notify(); self.notify().await;
} }
/** /**
Notifies the observers of the GatewayEvent. Notifies the observers of the GatewayEvent.
*/ */
fn notify(&self) { async fn notify(&self) {
for observer in &self.observers { for observer in &self.observers {
observer.update(&self.event_data); observer.lock().await.update(&self.event_data);
} }
} }
} }
mod events { mod events {
use super::*; use super::*;
#[derive(Default)] #[derive(Default, Debug)]
pub struct Events<'a> { pub struct Events {
pub message: Message<'a>, pub message: Message,
pub user: User<'a>, pub user: User,
pub gateway_identify_payload: GatewayEvent<'a, GatewayIdentifyPayload>, pub gateway_identify_payload: GatewayEvent<GatewayIdentifyPayload>,
pub gateway_resume: GatewayEvent<'a, GatewayResume>, pub gateway_resume: GatewayEvent<GatewayResume>,
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct Message<'a> { pub struct Message {
pub create: GatewayEvent<'a, MessageCreate>, pub create: GatewayEvent<MessageCreate>,
pub update: GatewayEvent<'a, MessageUpdate>, pub update: GatewayEvent<MessageUpdate>,
pub delete: GatewayEvent<'a, MessageDelete>, pub delete: GatewayEvent<MessageDelete>,
pub delete_bulk: GatewayEvent<'a, MessageDeleteBulk>, pub delete_bulk: GatewayEvent<MessageDeleteBulk>,
pub reaction_add: GatewayEvent<'a, MessageReactionAdd>, pub reaction_add: GatewayEvent<MessageReactionAdd>,
pub reaction_remove: GatewayEvent<'a, MessageReactionRemove>, pub reaction_remove: GatewayEvent<MessageReactionRemove>,
pub reaction_remove_all: GatewayEvent<'a, MessageReactionRemoveAll>, pub reaction_remove_all: GatewayEvent<MessageReactionRemoveAll>,
pub reaction_remove_emoji: GatewayEvent<'a, MessageReactionRemoveEmoji>, pub reaction_remove_emoji: GatewayEvent<MessageReactionRemoveEmoji>,
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct User<'a> { pub struct User {
pub presence_update: GatewayEvent<'a, PresenceUpdate>, pub presence_update: GatewayEvent<PresenceUpdate>,
pub typing_start_event: GatewayEvent<'a, TypingStartEvent>, pub typing_start_event: GatewayEvent<TypingStartEvent>,
} }
} }
@ -466,6 +522,7 @@ mod example {
use super::*; use super::*;
use crate::api::types::GatewayResume; use crate::api::types::GatewayResume;
#[derive(Debug)]
struct Consumer; struct Consumer;
impl Observer<GatewayResume> for Consumer { impl Observer<GatewayResume> for Consumer {
fn update(&self, data: &GatewayResume) { fn update(&self, data: &GatewayResume) {
@ -473,8 +530,8 @@ mod example {
} }
} }
#[test] #[tokio::test]
fn test_observer_behaviour() { async fn test_observer_behaviour() {
let mut event = GatewayEvent::new(GatewayResume { let mut event = GatewayEvent::new(GatewayResume {
token: "start".to_string(), token: "start".to_string(),
session_id: "start".to_string(), session_id: "start".to_string(),
@ -489,15 +546,15 @@ mod example {
let consumer = Consumer; let consumer = Consumer;
event.subscribe(&consumer); event.subscribe(Arc::new(Mutex::new(consumer)));
event.notify(); event.notify().await;
event.update_data(new_data); event.update_data(new_data).await;
let second_consumer = Consumer; let second_consumer = Consumer;
match event.subscribe(&second_consumer) { match event.subscribe(Arc::new(Mutex::new(second_consumer))) {
None => assert!(false), None => assert!(false),
Some(err) => println!("You cannot subscribe twice: {}", err), Some(err) => println!("You cannot subscribe twice: {}", err),
} }

View File

@ -1,12 +1,13 @@
use crate::api::limits::Limits; use crate::api::limits::Limits;
use crate::api::types::{InstancePolicies}; use crate::api::types::{InstancePolicies};
use crate::errors::{FieldFormatError, InstanceServerError}; use crate::errors::{FieldFormatError, InstanceServerError};
use crate::gateway::Gateway; use crate::gateway::{GatewayHandle, Gateway};
use crate::limit::LimitedRequester; use crate::limit::LimitedRequester;
use crate::URLBundle; use crate::URLBundle;
use std::fmt; use std::fmt;
#[derive(Debug)]
/** /**
The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server. The [`Instance`] what you will be using to perform all sorts of actions on the Spacebar server.
@ -16,7 +17,6 @@ pub struct Instance {
pub instance_info: InstancePolicies, pub instance_info: InstancePolicies,
pub requester: LimitedRequester, pub requester: LimitedRequester,
pub limits: Limits, pub limits: Limits,
pub gateway: Gateway,
} }
impl Instance { impl Instance {
@ -45,7 +45,6 @@ impl Instance {
), ),
limits: Limits::check_limits(urls.api).await, limits: Limits::check_limits(urls.api).await,
requester, requester,
gateway: Gateway::new(urls.wss.clone()).await.unwrap(),
}; };
instance.instance_info = match instance.instance_policies_schema().await { instance.instance_info = match instance.instance_policies_schema().await {
Ok(schema) => schema, Ok(schema) => schema,