Compare commits

...

101 Commits

Author SHA1 Message Date
kozabrada123 85fc9e2ca5
Merge 577726f3bd into 8e25f401a5 2024-01-21 21:19:03 +01:00
kozabrada123 577726f3bd fix: use ip discovery address as string, not as Vec<u8> 2024-01-20 12:18:03 +01:00
kozabrada123 eeb3b4a304 Merge branch 'dev' into feature/voice 2024-01-20 07:27:25 +01:00
kozabrada123 b3c1e37fa4 fix: unused import 2024-01-19 19:47:04 +01:00
kozabrada123 60c0c3c536 committed too much 2024-01-19 19:45:34 +01:00
kozabrada123 a787a989ef update voice heartbeat, fix the new test issue 2024-01-19 19:42:24 +01:00
kozabrada123 2bf022924b merge w/ dev 2024-01-19 19:24:59 +01:00
kozabrada123 badf3e9d47 testing tests 2024-01-18 17:10:11 +01:00
kozabrada123 f7a2285dd5 tests: better gateway auth test 2024-01-18 16:27:45 +01:00
kozabrada123 6f9ed86a4c chore: json isn't a doc test 2024-01-18 16:27:07 +01:00
kozabrada123 cadaca90a1 docs: fix doc warning, fix incorrect refrences to 'webrtc' 2024-01-18 12:24:19 +01:00
kozabrada123 0d8fc2410c normal tests work? 2024-01-12 18:41:10 +01:00
kozabrada123 b9e5ee6d16 tests: add nonce test 2024-01-12 18:26:27 +01:00
kozabrada123 c6919d464c Okay can't do that actually 2024-01-12 17:50:44 +01:00
kozabrada123 f2aa22329a drop buf asap 2024-01-12 17:45:49 +01:00
kozabrada123 7a3a7dcd8e chore: update on packet size FIXME 2024-01-12 17:40:40 +01:00
kozabrada123 7a41667ad4 chore: update getrandom version to match wasm version 2024-01-12 17:40:09 +01:00
kozabrada123 e950659785 chore: unused imports 2024-01-12 17:00:24 +01:00
kozabrada123 2b4d07d020 docs: document voice encryption modes 2024-01-12 16:57:51 +01:00
kozabrada123 81dfcb93f1 chore: merge dev, fix merge conflict 2024-01-12 16:49:01 +01:00
kozabrada123 03fd1a6787 feat: new encryption modes, minor code quality 2024-01-12 16:45:56 +01:00
kozabrada123 cdba76bcf9 api: split voice gateway and udp features, test for voice gateway in WASM 2023-12-30 13:17:12 +01:00
kozabrada123 2b729dc8fd chore: clarify UDP on WASM 2023-12-30 11:42:44 +01:00
kozabrada123 65213bb0fb fix: its the same 2023-12-29 12:54:46 +01:00
kozabrada123 e9ef2444d5 feat: udp error handling, create udp/backends 2023-12-29 12:49:03 +01:00
kozabrada123 2dadd38604
Merge branch 'dev' into feature/voice 2023-12-29 11:34:11 +01:00
kozabrada123 8413b66e22 chore: split voice udp 2023-12-29 11:33:14 +01:00
kozabrada123 9039e216be fix: properly using encrypted data, bad practice for buffer creation 2023-12-29 10:09:06 +01:00
kozabrada123 a5283c7780 fix: gateway connect using wrong url 2023-12-29 10:08:37 +01:00
kozabrada123 a5e4170641 fix: blunder 2023-12-28 09:29:49 +01:00
kozabrada123 ef4d6cffdb feat: first try at vgw wasm compat 2023-12-28 09:21:47 +01:00
kozabrada123 33400daa74 fix: duplicated gateway events 2023-12-28 09:21:08 +01:00
kozabrada123 a6d68383cc chore: yes clippy, you are special 2023-12-27 22:10:16 +01:00
kozabrada123 db4dcae579 feat: merge VoiceHandler into official development 2023-12-27 21:48:35 +01:00
kozabrada123 8aefa65093 chore: yes 2023-12-18 18:22:53 +01:00
kozabrada123 19dc9c8ffd feat: add sequence number 2023-12-17 13:47:11 +01:00
kozabrada123 3875e2e7ee small updates 2023-12-17 11:51:02 +01:00
kozabrada123 ba4818dbad feat: Public api! (sorta) 2023-12-16 21:56:14 +01:00
kozabrada123 17f5456841 feat: add untested sending & asbtract nonce generation 2023-12-16 20:19:09 +01:00
kozabrada123 2cd4dda9f4 feat: add ssrc definition (op 12) 2023-12-16 15:55:29 +01:00
kozabrada123 13c9e558fb chore: formatting 2023-12-16 15:31:48 +01:00
kozabrada123 c86a312615 feat: decryption? 2023-12-16 13:46:29 +01:00
kozabrada123 66f14a1c21 feat: add VoiceData reference to UdpHandler 2023-12-16 12:20:02 +01:00
kozabrada123 51ce2b8ef8 feat: add VoiceData struct 2023-12-16 12:19:19 +01:00
kozabrada123 5abd143145 chore: yes clippy, that is indeed an unneeded return statement 2023-12-16 11:38:40 +01:00
kozabrada123 6a5d58329d fix: attempt to fix failing wasm build 2023-12-16 11:25:33 +01:00
kozabrada123 a3ad3cce0b chore: clippy + other misc updates 2023-12-16 11:23:03 +01:00
kozabrada123 9eee1f74a3 chore: merge main 2023-12-16 10:30:01 +01:00
kozabrada123 03d47aebe8 Add UdpHandle 2023-12-16 09:40:47 +01:00
kozabrada123 b8d344d745 chore: rename events/webrtc to events/voice_gateway 2023-12-16 09:36:40 +01:00
kozabrada123 d3e0c82369 Merge with latest dev 2023-11-22 19:39:53 +01:00
kozabrada123 7b3beaf23c feat: add voice_media_sink_wants
(comitting uncommited changes to merge)
2023-11-22 19:27:46 +01:00
kozabrada123 355d3c49b8 Merge dev for #430 fix 2023-11-16 10:31:56 +01:00
kozabrada123 f5c5e1cc5e fix: voice works again 2023-11-16 10:17:45 +01:00
kozabrada123 4faf25165d Merge main 2023-11-16 09:59:09 +01:00
kozabrada123 9460219d14 feat: packet parsing! 2023-11-12 14:59:28 +01:00
kozabrada123 b973ecb447 feat: return ip discovery data + minor update 2023-11-12 13:33:29 +01:00
kozabrada123 8278cc2162 feat: kinda janky ip discovery impl 2023-11-12 12:54:32 +01:00
kozabrada123 fb42b6b713 fix: deserialization error in speaking bitflags 2023-11-12 10:53:23 +01:00
kozabrada123 b0ae700775 Restructure voice to new module 2023-11-12 10:52:42 +01:00
kozabrada123 ef1d314291 Create seperate voice_gateway.rs and voice_udp.rs 2023-11-12 10:50:07 +01:00
kozabrada123 fad04da125 Update to v7 2023-10-15 11:51:59 +02:00
kozabrada123 feb8d4610c Clarify FIXME related to #430 2023-10-15 09:47:36 +02:00
kozabrada123 fa3c3b76ae Update voice identify 2023-10-15 09:47:08 +02:00
kozabrada123 cf70147500 Fix error failing to 'deserialize' properly 2023-10-14 18:13:09 +02:00
kozabrada123 e5c4cc3df9 Voice gateway updates 2023-10-14 11:51:31 +02:00
kozabrada123 5608d96a5f Fix bad request in voice gateway init 2023-10-14 11:03:50 +02:00
kozabrada123 8ab75e313a ?? 2023-10-14 10:49:51 +02:00
kozabrada123 7b8bcffafa Event updates via the scientific method 2023-10-14 10:43:02 +02:00
kozabrada123 cdcc6a5270 Make voice event fields pub 2023-10-14 10:29:05 +02:00
kozabrada123 1639d4e00f Add default impl for voicegatewayerror 2023-10-14 10:25:28 +02:00
kozabrada123 e4f0a3840a Modernise voice gateway 2023-10-14 09:58:26 +02:00
kozabrada123 68b6ff4ca7 Minor doc fixes 2023-10-14 08:53:31 +02:00
kozabrada123 c732a97da6 275 commits behind ono 2023-10-14 08:41:26 +02:00
kozabrada123 818e4342cf Rebase 2023-10-14 08:19:35 +02:00
kozabrada123 795cd5b9b5 e 2023-08-29 18:25:21 +02:00
kozabrada123 bbe24d60b9 Small types update 2023-08-29 18:18:48 +02:00
kozabrada123 b04a906112 Even more derives 2023-08-29 18:16:45 +02:00
kozabrada123 2cd48a948c More derives 2023-08-29 17:53:48 +02:00
kozabrada123 cea362f506 Minor updates 2023-08-29 17:49:30 +02:00
kozabrada123 3b3ba4f3cf Test error observer 2023-08-29 14:44:47 +02:00
kozabrada123 5a4d3cba04 Rebase 2023-08-29 14:33:17 +02:00
kozabrada123 b2d125104a Same allow as for voice as normal gateway 2023-07-28 18:04:16 +02:00
kozabrada123 5940af777c Merge with main, but better this time 2023-07-28 18:01:52 +02:00
kozabrada123 89d4348498 Merge with main 2023-07-28 17:33:23 +02:00
kozabrada123 46aa437c8a Merge branch 'main' into perpetual/gateway-dev 2023-07-21 14:02:16 +02:00
kozabrada123 23186e22b1 Merge branch 'main' into perpetual/gateway-dev 2023-07-11 19:23:34 +02:00
kozabrada123 7de62f0152 Merge branch 'main' into perpetual/gateway-dev 2023-07-11 18:35:12 +02:00
kozabrada123 a07d9d7579 Merge branch 'perpetual/gateway-dev' into feature/voice 2023-06-23 09:29:56 +02:00
kozabrada123 3f24cd67f1 Merge branch 'main' into perpetual/gateway-dev 2023-06-23 09:29:27 +02:00
kozabrada123 37f6ea91a3 Merge branch 'perpetual/gateway-dev' into feature/voice 2023-06-22 09:14:07 +02:00
kozabrada123 19c1f3923f Merge branch 'main' into perpetual/gateway-dev 2023-06-22 09:13:37 +02:00
kozabrada123 30b0cb5296 Merge branch 'main' into feature/webrtc 2023-06-21 14:28:05 +02:00
kozabrada123 04923f7d09 Merge branch 'main' into perpetual/gateway-dev 2023-06-21 14:27:45 +02:00
kozabrada123 5e037121cd fmt 2023-06-21 14:26:52 +02:00
kozabrada123 9dc37c9469 Attempt an untested voice gateway implementation 2023-06-21 14:26:00 +02:00
kozabrada123 2ef07d965a Merge branch 'perpetual/gateway-dev' into feature/webrtc 2023-06-21 08:09:15 +02:00
kozabrada123 2d3f23744c Merge branch 'main' into perpetual/gateway-dev 2023-06-21 08:01:05 +02:00
kozabrada123 b4a4e1f5d5 Add more webrtc typings 2023-06-20 19:51:28 +02:00
kozabrada123 cfe4e2c7bb Merge branch 'main' into perpetual/gateway-dev 2023-06-20 19:12:39 +02:00
kozabrada123 1431aba363 Add Webrtc Identify & Ready 2023-06-20 19:12:21 +02:00
51 changed files with 2486 additions and 234 deletions

View File

@ -100,7 +100,7 @@ jobs:
rustup target add wasm32-unknown-unknown
curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
cargo binstall --no-confirm wasm-bindgen-cli --version "0.2.88" --force
GECKODRIVER=$(which geckodriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt"
GECKODRIVER=$(which geckodriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt, voice_gateway"
wasm-chrome:
runs-on: macos-latest
steps:
@ -128,4 +128,4 @@ jobs:
rustup target add wasm32-unknown-unknown
curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
cargo binstall --no-confirm wasm-bindgen-cli --version "0.2.88" --force
CHROMEDRIVER=$(which chromedriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt"
CHROMEDRIVER=$(which chromedriver) cargo test --target wasm32-unknown-unknown --no-default-features --features="client, rt, voice_gateway"

131
Cargo.lock generated
View File

@ -17,6 +17,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "ahash"
version = "0.8.7"
@ -206,7 +216,9 @@ dependencies = [
"bitflags 2.4.1",
"chorus-macros",
"chrono",
"crypto_secretbox",
"custom_error",
"discortp",
"futures-util",
"getrandom",
"hostname",
@ -264,6 +276,17 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
"zeroize",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
@ -342,9 +365,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "crypto_secretbox"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1"
dependencies = [
"aead",
"cipher",
"generic-array",
"poly1305",
"salsa20",
"subtle",
"zeroize",
]
[[package]]
name = "custom_error"
version = "1.9.2"
@ -425,6 +464,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "discortp"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524b9439c09174aede2c88d58cfc6b83575b06569d1af4d07562f76595b2896b"
dependencies = [
"pnet_macros",
"pnet_macros_support",
]
[[package]]
name = "dotenvy"
version = "0.15.7"
@ -643,6 +692,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
"zeroize",
]
[[package]]
@ -923,6 +973,15 @@ dependencies = [
"serde",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@ -1123,6 +1182,12 @@ dependencies = [
"libc",
]
[[package]]
name = "no-std-net"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65"
[[package]]
name = "nom"
version = "7.1.3"
@ -1217,6 +1282,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.62"
@ -1363,6 +1434,36 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
[[package]]
name = "pnet_base"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d3a993d49e5fd5d4d854d6999d4addca1f72d86c65adf224a36757161c02b6"
dependencies = [
"no-std-net",
]
[[package]]
name = "pnet_macros"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dd52a5211fac27e7acb14cfc9f30ae16ae0e956b7b779c8214c74559cef4c3"
dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 1.0.109",
]
[[package]]
name = "pnet_macros_support"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89de095dc7739349559913aed1ef6a11e73ceade4897dadc77c5e09de6740750"
dependencies = [
"pnet_base",
]
[[package]]
name = "poem"
version = "1.3.59"
@ -1406,6 +1507,17 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "poly1305"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8159bd90725d2df49889a078b54f4f79e87f1f8a8444194cdca81d38f5393abf"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -1688,6 +1800,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "salsa20"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
dependencies = [
"cipher",
]
[[package]]
name = "schannel"
version = "0.1.23"
@ -2526,6 +2647,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "untrusted"
version = "0.7.1"

View File

@ -16,6 +16,9 @@ backend = ["dep:poem", "dep:sqlx"]
rt-multi-thread = ["tokio/rt-multi-thread"]
rt = ["tokio/rt"]
client = []
voice = ["voice_udp", "voice_gateway"]
voice_udp = ["dep:discortp", "dep:crypto_secretbox"]
voice_gateway = []
[dependencies]
tokio = { version = "1.35.1", features = ["macros", "sync"] }
@ -52,6 +55,8 @@ sqlx = { version = "0.7.3", features = [
"runtime-tokio-native-tls",
"any",
], optional = true }
discortp = { version = "0.5.0", optional = true, features = ["rtp", "discord", "demux"] }
crypto_secretbox = {version = "0.1.1", optional = true}
rand = "0.8.5"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
@ -63,6 +68,7 @@ tokio-tungstenite = { version = "0.20.1", features = [
] }
native-tls = "0.2.11"
hostname = "0.3.1"
getrandom = { version = "0.2.12" }
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.12", features = ["js"] }

View File

@ -125,7 +125,7 @@ like "proxy connection checking" are already disabled on this version, which oth
### wasm
To test for wasm, you will need to `cargo install wasm-pack`. You can then run
`wasm-pack test --<chrome/firefox/safari> --headless -- --target wasm32-unknown-unknown --features="rt, client" --no-default-features`
`wasm-pack test --<chrome/firefox/safari> --headless -- --target wasm32-unknown-unknown --features="rt, client, voice_gateway" --no-default-features`
to run the tests for wasm.
## Versioning

View File

@ -9,7 +9,7 @@ impl Guild {
/// permission to be present on the current user.
///
/// If the guild/channel you are searching is not yet indexed, the endpoint will return a 202 accepted response.
/// In this case, the method will return a [`ChorusError::InvalidResponse`] error.
/// In this case, the method will return a [`ChorusError::InvalidResponse`](crate::errors::ChorusError::InvalidResponse) error.
///
/// # Reference:
/// See <https://discord-userdoccers.vercel.app/resources/message#search-messages>

View File

@ -63,7 +63,7 @@ custom_error! {
}
custom_error! {
/// For errors we receive from the gateway, see https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#gateway-close-event-codes;
/// For errors we receive from the gateway, see <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#gateway-close-event-codes>;
///
/// Supposed to be sent as numbers, though they are sent as string most of the time?
///
@ -96,3 +96,58 @@ custom_error! {
}
impl WebSocketEvent for GatewayError {}
custom_error! {
/// Voice Gateway errors
///
/// Similar to [GatewayError].
///
/// See <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>;
#[derive(Clone, Default, PartialEq, Eq)]
pub VoiceGatewayError
// Errors we receive
#[default]
UnknownOpcode = "You sent an invalid opcode",
FailedToDecodePayload = "You sent an invalid payload in your identifying to the (Voice) Gateway",
NotAuthenticated = "You sent a payload before identifying with the (Voice) Gateway",
AuthenticationFailed = "The token you sent in your identify payload is incorrect",
AlreadyAuthenticated = "You sent more than one identify payload",
SessionNoLongerValid = "Your session is no longer valid",
SessionTimeout = "Your session has timed out",
ServerNotFound = "We can't find the server you're trying to connect to",
UnknownProtocol = "We didn't recognize the protocol you sent",
Disconnected = "Channel was deleted, you were kicked, voice server changed, or the main gateway session was dropped. Should not reconnect.",
VoiceServerCrashed = "The server crashed, try resuming",
UnknownEncryptionMode = "Server failed to decrypt data",
// Errors when initiating a gateway connection
CannotConnect{error: String} = "Cannot connect due to a tungstenite error: {error}",
NonHelloOnInitiate{opcode: u8} = "Received non hello on initial gateway connection ({opcode}), something is definitely wrong",
// Other misc errors
UnexpectedOpcodeReceived{opcode: u8} = "Received an opcode we weren't expecting to receive: {opcode}",
}
impl WebSocketEvent for VoiceGatewayError {}
custom_error! {
/// Voice UDP errors.
#[derive(Clone, PartialEq, Eq)]
pub VoiceUdpError
// General errors
BrokenSocket{error: String} = "Could not write / read from udp socket: {error}",
NoData = "We have not set received the necessary data to perform this operation.",
// Encryption errors
NoKey = "Tried to encrypt / decrypt rtp data, but no key has been received yet",
FailedEncryption = "Tried to encrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
FailedDecryption = "Tried to decrypt rtp data, but failed. Most likely this is an issue chorus' nonce generation. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new",
FailedNonceGeneration{error: String} = "Tried to generate nonce, but failed due to error: {error}.",
// Errors when initiating a socket connection
CannotBind{error: String} = "Cannot bind socket due to a udp error: {error}",
CannotConnect{error: String} = "Cannot connect due to a udp error: {error}",
}
impl WebSocketEvent for VoiceUdpError {}

View File

@ -5,7 +5,7 @@ use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use self::event::Events;
use super::events::Events;
use super::*;
use super::{Sink, Stream};
use crate::types::{
@ -101,7 +101,6 @@ impl Gateway {
let msg = self.websocket_receive.next().await;
// PRETTYFYME: Remove inline conditional compiling
// This if chain can be much better but if let is unstable on stable rust
#[cfg(not(target_arch = "wasm32"))]
if let Some(Ok(message)) = msg {
self.handle_message(message.into()).await;
@ -394,165 +393,3 @@ impl Gateway {
}
}
}
pub mod event {
use super::*;
#[derive(Default, Debug)]
pub struct Events {
pub application: Application,
pub auto_moderation: AutoModeration,
pub session: Session,
pub message: Message,
pub user: User,
pub relationship: Relationship,
pub channel: Channel,
pub thread: Thread,
pub guild: Guild,
pub invite: Invite,
pub integration: Integration,
pub interaction: Interaction,
pub stage_instance: StageInstance,
pub call: Call,
pub voice: Voice,
pub webhooks: Webhooks,
pub gateway_identify_payload: GatewayEvent<types::GatewayIdentifyPayload>,
pub gateway_resume: GatewayEvent<types::GatewayResume>,
pub error: GatewayEvent<GatewayError>,
}
#[derive(Default, Debug)]
pub struct Application {
pub command_permissions_update: GatewayEvent<types::ApplicationCommandPermissionsUpdate>,
}
#[derive(Default, Debug)]
pub struct AutoModeration {
pub rule_create: GatewayEvent<types::AutoModerationRuleCreate>,
pub rule_update: GatewayEvent<types::AutoModerationRuleUpdate>,
pub rule_delete: GatewayEvent<types::AutoModerationRuleDelete>,
pub action_execution: GatewayEvent<types::AutoModerationActionExecution>,
}
#[derive(Default, Debug)]
pub struct Session {
pub ready: GatewayEvent<types::GatewayReady>,
pub ready_supplemental: GatewayEvent<types::GatewayReadySupplemental>,
pub replace: GatewayEvent<types::SessionsReplace>,
}
#[derive(Default, Debug)]
pub struct StageInstance {
pub create: GatewayEvent<types::StageInstanceCreate>,
pub update: GatewayEvent<types::StageInstanceUpdate>,
pub delete: GatewayEvent<types::StageInstanceDelete>,
}
#[derive(Default, Debug)]
pub struct Message {
pub create: GatewayEvent<types::MessageCreate>,
pub update: GatewayEvent<types::MessageUpdate>,
pub delete: GatewayEvent<types::MessageDelete>,
pub delete_bulk: GatewayEvent<types::MessageDeleteBulk>,
pub reaction_add: GatewayEvent<types::MessageReactionAdd>,
pub reaction_remove: GatewayEvent<types::MessageReactionRemove>,
pub reaction_remove_all: GatewayEvent<types::MessageReactionRemoveAll>,
pub reaction_remove_emoji: GatewayEvent<types::MessageReactionRemoveEmoji>,
pub ack: GatewayEvent<types::MessageACK>,
}
#[derive(Default, Debug)]
pub struct User {
pub update: GatewayEvent<types::UserUpdate>,
pub guild_settings_update: GatewayEvent<types::UserGuildSettingsUpdate>,
pub presence_update: GatewayEvent<types::PresenceUpdate>,
pub typing_start: GatewayEvent<types::TypingStartEvent>,
}
#[derive(Default, Debug)]
pub struct Relationship {
pub add: GatewayEvent<types::RelationshipAdd>,
pub remove: GatewayEvent<types::RelationshipRemove>,
}
#[derive(Default, Debug)]
pub struct Channel {
pub create: GatewayEvent<types::ChannelCreate>,
pub update: GatewayEvent<types::ChannelUpdate>,
pub unread_update: GatewayEvent<types::ChannelUnreadUpdate>,
pub delete: GatewayEvent<types::ChannelDelete>,
pub pins_update: GatewayEvent<types::ChannelPinsUpdate>,
}
#[derive(Default, Debug)]
pub struct Thread {
pub create: GatewayEvent<types::ThreadCreate>,
pub update: GatewayEvent<types::ThreadUpdate>,
pub delete: GatewayEvent<types::ThreadDelete>,
pub list_sync: GatewayEvent<types::ThreadListSync>,
pub member_update: GatewayEvent<types::ThreadMemberUpdate>,
pub members_update: GatewayEvent<types::ThreadMembersUpdate>,
}
#[derive(Default, Debug)]
pub struct Guild {
pub create: GatewayEvent<types::GuildCreate>,
pub update: GatewayEvent<types::GuildUpdate>,
pub delete: GatewayEvent<types::GuildDelete>,
pub audit_log_entry_create: GatewayEvent<types::GuildAuditLogEntryCreate>,
pub ban_add: GatewayEvent<types::GuildBanAdd>,
pub ban_remove: GatewayEvent<types::GuildBanRemove>,
pub emojis_update: GatewayEvent<types::GuildEmojisUpdate>,
pub stickers_update: GatewayEvent<types::GuildStickersUpdate>,
pub integrations_update: GatewayEvent<types::GuildIntegrationsUpdate>,
pub member_add: GatewayEvent<types::GuildMemberAdd>,
pub member_remove: GatewayEvent<types::GuildMemberRemove>,
pub member_update: GatewayEvent<types::GuildMemberUpdate>,
pub members_chunk: GatewayEvent<types::GuildMembersChunk>,
pub role_create: GatewayEvent<types::GuildRoleCreate>,
pub role_update: GatewayEvent<types::GuildRoleUpdate>,
pub role_delete: GatewayEvent<types::GuildRoleDelete>,
pub role_scheduled_event_create: GatewayEvent<types::GuildScheduledEventCreate>,
pub role_scheduled_event_update: GatewayEvent<types::GuildScheduledEventUpdate>,
pub role_scheduled_event_delete: GatewayEvent<types::GuildScheduledEventDelete>,
pub role_scheduled_event_user_add: GatewayEvent<types::GuildScheduledEventUserAdd>,
pub role_scheduled_event_user_remove: GatewayEvent<types::GuildScheduledEventUserRemove>,
pub passive_update_v1: GatewayEvent<types::PassiveUpdateV1>,
}
#[derive(Default, Debug)]
pub struct Invite {
pub create: GatewayEvent<types::InviteCreate>,
pub delete: GatewayEvent<types::InviteDelete>,
}
#[derive(Default, Debug)]
pub struct Integration {
pub create: GatewayEvent<types::IntegrationCreate>,
pub update: GatewayEvent<types::IntegrationUpdate>,
pub delete: GatewayEvent<types::IntegrationDelete>,
}
#[derive(Default, Debug)]
pub struct Interaction {
pub create: GatewayEvent<types::InteractionCreate>,
}
#[derive(Default, Debug)]
pub struct Call {
pub create: GatewayEvent<types::CallCreate>,
pub update: GatewayEvent<types::CallUpdate>,
pub delete: GatewayEvent<types::CallDelete>,
}
#[derive(Default, Debug)]
pub struct Voice {
pub state_update: GatewayEvent<types::VoiceStateUpdate>,
pub server_update: GatewayEvent<types::VoiceServerUpdate>,
}
#[derive(Default, Debug)]
pub struct Webhooks {
pub update: GatewayEvent<types::WebhooksUpdate>,
}
}

View File

@ -3,7 +3,7 @@ use log::*;
use std::fmt::Debug;
use super::{event::Events, *};
use super::{events::Events, *};
use crate::types::{self, Composite};
/// Represents a handle to a Gateway connection. A Gateway connection will create observable

View File

@ -22,7 +22,7 @@ use super::*;
use crate::types;
/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
pub const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;
/// Handles sending heartbeats to the gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used

View File

@ -94,6 +94,12 @@ pub struct GatewayEvent<T: WebSocketEvent> {
}
impl<T: WebSocketEvent> GatewayEvent<T> {
pub fn new() -> Self {
Self {
observers: Vec::new(),
}
}
/// Returns true if the GatewayEvent is observed by at least one Observer.
pub fn is_observed(&self) -> bool {
!self.observers.is_empty()
@ -116,7 +122,7 @@ impl<T: WebSocketEvent> GatewayEvent<T> {
}
/// Notifies the observers of the GatewayEvent.
async fn notify(&self, new_event_data: T) {
pub(crate) async fn notify(&self, new_event_data: T) {
for observer in &self.observers {
observer.update(&new_event_data).await;
}

View File

@ -128,7 +128,10 @@ pub mod instance;
#[cfg(feature = "client")]
pub mod ratelimiter;
pub mod types;
#[cfg(feature = "client")]
#[cfg(all(
feature = "client",
any(feature = "voice_udp", feature = "voice_gateway")
))]
pub mod voice;
#[derive(Clone, Default, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]

View File

@ -34,6 +34,7 @@ pub struct VoiceState {
pub channel_id: Option<Snowflake>,
pub user_id: Snowflake,
pub member: Option<Arc<RwLock<GuildMember>>>,
/// Includes alphanumeric characters, not a snowflake
pub session_id: String,
pub token: Option<String>,
pub deaf: bool,

View File

@ -24,8 +24,8 @@ pub use stage_instance::*;
pub use thread::*;
pub use user::*;
pub use voice::*;
pub use voice_gateway::*;
pub use webhooks::*;
pub use webrtc::*;
#[cfg(feature = "client")]
use super::Snowflake;
@ -72,7 +72,7 @@ mod user;
mod voice;
mod webhooks;
mod webrtc;
mod voice_gateway;
pub trait WebSocketEvent: Send + Sync + Debug {}

View File

@ -34,7 +34,10 @@ impl WebSocketEvent for VoiceStateUpdate {}
/// Received to indicate which voice endpoint, token and guild_id to use;
pub struct VoiceServerUpdate {
pub token: String,
pub guild_id: Snowflake,
/// The guild this voice server update is for
pub guild_id: Option<Snowflake>,
/// The private channel this voice server update is for
pub channel_id: Option<Snowflake>,
pub endpoint: Option<String>,
}

View File

@ -0,0 +1,36 @@
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user connects to the voice server.
///
/// Contains the user id and "flags".
///
/// Not documented anywhere, if you know what this is, please reach out
///
/// {"op":18,"d":{"user_id":"1234567890","flags":2}}
pub struct VoiceClientConnectFlags {
pub user_id: Snowflake,
// Likely some sort of bitflags
//
// Not always sent, sometimes null?
pub flags: Option<u8>,
}
impl WebSocketEvent for VoiceClientConnectFlags {}
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user connects to the voice server.
///
/// Contains the user id and "platform".
///
/// Not documented anywhere, if you know what this is, please reach out
///
/// {"op":20,"d":{"user_id":"1234567890","platform":0}}
pub struct VoiceClientConnectPlatform {
pub user_id: Snowflake,
// Likely an enum
pub platform: u8,
}
impl WebSocketEvent for VoiceClientConnectPlatform {}

View File

@ -0,0 +1,14 @@
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Sent when another user disconnects from the voice server.
///
/// When received, the SSRC of the user should be discarded.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#other-client-disconnection>
pub struct VoiceClientDisconnection {
pub user_id: Snowflake,
}
impl WebSocketEvent for VoiceClientDisconnection {}

View File

@ -0,0 +1,20 @@
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// Contains info on how often the client should send heartbeats to the server;
///
/// Differs from the normal hello data in that discord sends heartbeat interval as a float.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#heartbeating>
pub struct VoiceHelloData {
/// The voice gateway version.
///
/// Note: no idea why this is sent, we already specify the version when establishing a connection.
#[serde(rename = "v")]
pub version: u8,
/// How often a client should send heartbeats, in milliseconds
pub heartbeat_interval: f64,
}
impl WebSocketEvent for VoiceHelloData {}

View File

@ -0,0 +1,21 @@
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
/// The identify payload for the voice gateway connection;
///
/// Contains authentication info and context to authenticate to the voice gateway.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#identify-structure>
pub struct VoiceIdentify {
/// The ID of the guild or the private channel being connected to
pub server_id: Snowflake,
pub user_id: Snowflake,
pub session_id: String,
pub token: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub video: Option<bool>,
// TODO: Add video streams
}
impl WebSocketEvent for VoiceIdentify {}

View File

@ -0,0 +1,14 @@
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Copy)]
/// What does this do?
///
/// {"op":15,"d":{"any":100}}
///
/// Opcode from <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
pub struct VoiceMediaSinkWants {
pub any: u16,
}
impl WebSocketEvent for VoiceMediaSinkWants {}

View File

@ -0,0 +1,140 @@
use super::WebSocketEvent;
use serde::{Deserialize, Serialize};
use serde_json::{value::RawValue, Value};
pub use client_connect::*;
pub use client_disconnect::*;
pub use hello::*;
pub use identify::*;
pub use media_sink_wants::*;
pub use ready::*;
pub use select_protocol::*;
pub use session_description::*;
pub use speaking::*;
pub use ssrc_definition::*;
pub use voice_backend_version::*;
mod client_connect;
mod client_disconnect;
mod hello;
mod identify;
mod media_sink_wants;
mod ready;
mod select_protocol;
mod session_description;
mod speaking;
mod ssrc_definition;
mod voice_backend_version;
#[derive(Debug, Default, Serialize, Clone)]
/// The payload used for sending events to the voice gateway.
///
/// Similar to [VoiceGatewayReceivePayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue]
pub struct VoiceGatewaySendPayload {
#[serde(rename = "op")]
pub op_code: u8,
#[serde(rename = "d")]
pub data: Value,
}
impl WebSocketEvent for VoiceGatewaySendPayload {}
#[derive(Debug, Deserialize, Clone)]
/// The payload used for receiving events from the voice gateway.
///
/// Note that this is similar to the regular gateway, except we no longer have s or t
///
/// Similar to [VoiceGatewaySendPayload], except we send a [Value] for d whilst we receive a [serde_json::value::RawValue]
pub struct VoiceGatewayReceivePayload<'a> {
#[serde(rename = "op")]
pub op_code: u8,
#[serde(borrow)]
#[serde(rename = "d")]
pub data: &'a RawValue,
}
impl<'a> WebSocketEvent for VoiceGatewayReceivePayload<'a> {}
/// The modes of encryption available in voice udp connections;
///
/// Not all encryption modes are implemented; it is generally recommended
/// to use either [[VoiceEncryptionMode::Xsalsa20Poly1305]] or
/// [[VoiceEncryptionMode::Xsalsa20Poly1305Suffix]]
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode> and <https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-udp-connection-encryption-modes>
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum VoiceEncryptionMode {
#[default]
// Officially Documented
/// Use XSalsa20Poly1305 encryption, using the rtp header as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305,
/// Use XSalsa20Poly1305 encryption, using a random 24 byte suffix as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305Suffix,
/// Use XSalsa20Poly1305 encryption, using a 4 byte incremental value as a nonce.
///
/// Fully implemented
Xsalsa20Poly1305Lite,
// Officially Undocumented
/// Not implemented yet, we have no idea what the rtpsize nonces are.
Xsalsa20Poly1305LiteRtpsize,
/// Not implemented yet
AeadAes256Gcm,
/// Not implemented yet
AeadAes256GcmRtpsize,
/// Not implemented yet, we have no idea what the rtpsize nonces are.
AeadXchacha20Poly1305Rtpsize,
}
/// The possible audio codecs to use
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AudioCodec {
#[default]
Opus,
}
/// The possible video codecs to use
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum VideoCodec {
#[default]
VP8,
VP9,
H264,
}
// The various voice opcodes
pub const VOICE_IDENTIFY: u8 = 0;
pub const VOICE_SELECT_PROTOCOL: u8 = 1;
pub const VOICE_READY: u8 = 2;
pub const VOICE_HEARTBEAT: u8 = 3;
pub const VOICE_SESSION_DESCRIPTION: u8 = 4;
pub const VOICE_SPEAKING: u8 = 5;
pub const VOICE_HEARTBEAT_ACK: u8 = 6;
pub const VOICE_RESUME: u8 = 7;
pub const VOICE_HELLO: u8 = 8;
pub const VOICE_RESUMED: u8 = 9;
pub const VOICE_SSRC_DEFINITION: u8 = 12;
pub const VOICE_CLIENT_DISCONNECT: u8 = 13;
pub const VOICE_SESSION_UPDATE: u8 = 14;
/// What is this?
///
/// {"op":15,"d":{"any":100}}
///
/// Opcode from <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
pub const VOICE_MEDIA_SINK_WANTS: u8 = 15;
/// See <https://discord-userdoccers.vercel.app/topics/opcodes-and-status-codes#voice-opcodes>
/// Sent with empty data from the client, the server responds with the voice backend version;
pub const VOICE_BACKEND_VERSION: u8 = 16;
// These two get simultaenously fired when a user joins, one has flags and one has a platform
pub const VOICE_CLIENT_CONNECT_FLAGS: u8 = 18;
pub const VOICE_CLIENT_CONNECT_PLATFORM: u8 = 20;

View File

@ -0,0 +1,42 @@
use std::net::Ipv4Addr;
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
use super::VoiceEncryptionMode;
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
/// The voice gateway's ready event;
///
/// Gives the user info about the udp connection ip and port, srrc to use,
/// available encryption modes and other data.
///
/// Sent in response to an Identify event.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#ready-structure>
pub struct VoiceReady {
/// See <https://developer.mozilla.org/en-US/docs/Web/API/RTCRtpStreamStats/ssrc>
pub ssrc: u32,
pub ip: Ipv4Addr,
pub port: u16,
/// The available encryption modes for the udp connection
pub modes: Vec<VoiceEncryptionMode>,
#[serde(default)]
pub experiments: Vec<String>,
// TODO: Add video streams
// Heartbeat interval is also sent, but is "an erroneous field and should be ignored. The correct heartbeat_interval value comes from the Hello payload."
}
impl Default for VoiceReady {
fn default() -> Self {
VoiceReady {
ssrc: 1,
ip: Ipv4Addr::UNSPECIFIED,
port: 0,
modes: Vec::new(),
experiments: Vec::new(),
}
}
}
impl WebSocketEvent for VoiceReady {}

View File

@ -0,0 +1,48 @@
use serde::{Deserialize, Serialize};
use super::VoiceEncryptionMode;
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// An event sent by the client to the voice gateway server,
/// detailing what protocol, address and encryption to use;
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#select-protocol-structure>
pub struct SelectProtocol {
/// The protocol to use. The only option chorus supports is [VoiceProtocol::Udp].
pub protocol: VoiceProtocol,
pub data: SelectProtocolData,
/// The UUID4 RTC connection ID, used for analytics.
///
/// Note: Not recommended to set this
pub rtc_connection_id: Option<String>,
// TODO: Add codecs, what is a codec object
/// The possible experiments we want to enable
#[serde(rename = "experiments")]
pub enabled_experiments: Vec<String>,
}
/// The possible protocol for sending a receiving voice data.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#select-protocol-structure>
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum VoiceProtocol {
#[default]
/// Sending data via UDP, documented and the only protocol chorus supports.
Udp,
// Possible value, yet NOT RECOMMENED, AS CHORUS DOES NOT SUPPORT WEBRTC
//Webrtc,
}
#[derive(Debug, Default, Deserialize, Serialize, Clone)]
/// The data field of the SelectProtocol Event
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#protocol-data-structure>
pub struct SelectProtocolData {
/// Our external ip we got from ip discovery
pub address: String,
/// Our external udp port we got from id discovery
pub port: u16,
/// The mode of encryption to use
pub mode: VoiceEncryptionMode,
}

View File

@ -0,0 +1,39 @@
use super::{AudioCodec, VideoCodec, VoiceEncryptionMode};
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Event that describes our encryption mode and secret key for encryption
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#session-description-structure>
pub struct SessionDescription {
pub audio_codec: AudioCodec,
pub video_codec: VideoCodec,
pub media_session_id: String,
/// The encryption mode to use
#[serde(rename = "mode")]
pub encryption_mode: VoiceEncryptionMode,
/// The secret key we'll use for encryption
pub secret_key: [u8; 32],
/// The keyframe interval in milliseconds
pub keyframe_interval: Option<u64>,
}
impl WebSocketEvent for SessionDescription {}
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Event that might be sent to update session parameters
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#session-update-structure>
pub struct SessionUpdate {
#[serde(rename = "audio_codec")]
pub new_audio_codec: Option<AudioCodec>,
#[serde(rename = "video_codec")]
pub new_video_codec: Option<VideoCodec>,
#[serde(rename = "media_session_id")]
pub new_media_session_id: Option<String>,
}
impl WebSocketEvent for SessionUpdate {}

View File

@ -0,0 +1,48 @@
use bitflags::bitflags;
use serde::{Deserialize, Serialize};
use crate::types::{Snowflake, WebSocketEvent};
/// Event that tells the server we are speaking;
///
/// Essentially, what allows us to send udp data and lights up the green circle around your avatar.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#speaking-structure>
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Speaking {
/// Data about the audio we're transmitting.
///
/// See [SpeakingBitflags]
pub speaking: u8,
pub ssrc: u32,
/// The user id of the speaking user, only sent by the server
#[serde(skip_serializing)]
pub user_id: Option<Snowflake>,
/// Delay in milliseconds, not sent by the server
#[serde(default)]
pub delay: u64,
}
impl WebSocketEvent for Speaking {}
bitflags! {
/// Bitflags of speaking types;
///
/// See <https://discord.com/developers/docs/topics/voice-connections#speaking>
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize, Deserialize)]
pub struct SpeakingBitflags: u8 {
/// Whether we'll be transmitting normal voice audio
const MICROPHONE = 1 << 0;
/// Whether we'll be transmitting context audio for video, no speaking indicator
const SOUNDSHARE = 1 << 1;
/// Whether we are a priority speaker, lowering audio of other speakers
const PRIORITY = 1 << 2;
}
}
impl Default for SpeakingBitflags {
/// Returns the default value for these flags, assuming normal microphone audio and not being a priority speaker
fn default() -> Self {
Self::MICROPHONE
}
}

View File

@ -0,0 +1,49 @@
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
/// Defines an event which provides ssrcs for voice and video for a user id.
///
/// This event is sent when we begin to speak.
///
/// It must be sent before sending audio, or else clients will not be able to play the stream.
///
/// This event is sent via opcode 12.
///
/// Examples of the event:
///
/// When receiving:
/// ```json
/// {"op":12,"d":{"video_ssrc":0,"user_id":"463640391196082177","streams":[{"ssrc":26595,"rtx_ssrc":26596,"rid":"100","quality":100,"max_resolution":{"width":1280,"type":"fixed","height":720},"max_framerate":30,"active":false}],"audio_ssrc":26597}}{"op":12,"d":{"video_ssrc":0,"user_id":"463640391196082177","streams":[{"ssrc":26595,"rtx_ssrc":26596,"rid":"100","quality":100,"max_resolution":{"width":1280,"type":"fixed","height":720},"max_framerate":30,"active":false}],"audio_ssrc":26597}}
/// ```
///
/// When sending:
/// ```json
/// {"op":12,"d":{"audio_ssrc":2307250864,"video_ssrc":0,"rtx_ssrc":0,"streams":[{"type":"video","rid":"100","ssrc":26595,"active":false,"quality":100,"rtx_ssrc":26596,"max_bitrate":2500000,"max_framerate":30,"max_resolution":{"type":"fixed","width":1280,"height":720}}]}}
/// ```
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
pub struct SsrcDefinition {
/// The ssrc used for video communications.
///
/// Is always sent and received, though is 0 if describing only the audio ssrc.
#[serde(default)]
pub video_ssrc: usize,
/// The ssrc used for audio communications.
///
/// Is always sent and received, though is 0 if describing only the video ssrc.
#[serde(default)]
pub audio_ssrc: usize,
// Not sure what this is
// It is usually 0
#[serde(default)]
pub rtx_ssrc: usize,
/// The user id these ssrcs apply to.
///
/// Is never sent by the user and is filled in by the server
#[serde(skip_serializing)]
pub user_id: Option<Snowflake>,
// TODO: Add video streams
#[serde(default)]
pub streams: Vec<String>,
}
impl WebSocketEvent for SsrcDefinition {}

View File

@ -0,0 +1,17 @@
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq)]
/// Received from the voice gateway server to describe the backend version.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-backend-version>
pub struct VoiceBackendVersion {
/// The voice backend's version
#[serde(rename = "voice")]
pub voice_version: String,
/// The WebRTC worker's version
#[serde(rename = "rtc_worker")]
pub rtc_worker_version: String,
}
impl WebSocketEvent for VoiceBackendVersion {}

View File

@ -1,18 +0,0 @@
use crate::types::{Snowflake, WebSocketEvent};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
/// The identify payload for the webrtc stream;
/// Contains info to begin a webrtc connection;
/// See https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-identify-payload;
pub struct VoiceIdentify {
server_id: Snowflake,
user_id: Snowflake,
session_id: String,
token: String,
#[serde(skip_serializing_if = "Option::is_none")]
/// Undocumented field, but is also in discord client comms
video: Option<bool>,
}
impl WebSocketEvent for VoiceIdentify {}

View File

@ -1,5 +0,0 @@
pub use identify::*;
pub use ready::*;
mod identify;
mod ready;

View File

@ -1,29 +0,0 @@
use std::net::Ipv4Addr;
use crate::types::WebSocketEvent;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
/// The ready event for the webrtc stream;
/// Used to give info after the identify event;
/// See https://discord.com/developers/docs/topics/voice-connections#establishing-a-voice-websocket-connection-example-voice-ready-payload;
pub struct VoiceReady {
ssrc: i32,
ip: Ipv4Addr,
port: u32,
modes: Vec<String>,
// Heartbeat interval is also sent, but is "an erroneous field and should be ignored. The correct heartbeat_interval value comes from the Hello payload."
}
impl Default for VoiceReady {
fn default() -> Self {
VoiceReady {
ssrc: 1,
ip: Ipv4Addr::UNSPECIFIED,
port: 0,
modes: Vec::new(),
}
}
}
impl WebSocketEvent for VoiceReady {}

View File

@ -1,2 +0,0 @@
//! Where the voice chat implementation will be, once it's finished.
//! For development on voice, see the feature/voice branch.

86
src/voice/crypto.rs Normal file
View File

@ -0,0 +1,86 @@
//! Defines cryptography functions used within the voice implementation.
//!
//! All functions in this module return a 24 byte long `Vec<u8>`.
/// Gets an `xsalsa20_poly1305` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_nonce(packet: &[u8]) -> Vec<u8> {
let mut rtp_header = Vec::with_capacity(24);
rtp_header.append(&mut packet[0..12].to_vec());
// The header is only 12 bytes, but the nonce has to be 24
while rtp_header.len() < 24 {
rtp_header.push(0);
}
rtp_header
}
/// Gets an `xsalsa20_poly1305_suffix` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_suffix_nonce(packet: &[u8]) -> Vec<u8> {
let mut nonce = Vec::with_capacity(24);
nonce.append(&mut packet[(packet.len() - 24)..packet.len()].to_vec());
nonce
}
/// Gets an `xsalsa20_poly1305_lite` nonce from an rtppacket.
///
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#encryption-mode>
pub(crate) fn get_xsalsa20_poly1305_lite_nonce(packet: &[u8]) -> Vec<u8> {
let mut nonce = Vec::with_capacity(24);
nonce.append(&mut packet[(packet.len() - 4)..packet.len()].to_vec());
// The suffix is only 4 bytes, but the nonce has to be 24
while nonce.len() < 24 {
nonce.push(0);
}
nonce
}
#[test]
// Asserts all functions that retrieve a nonce from packet bytes
fn test_packet_nonce_derives() {
let test_packet_bytes = vec![
144, 120, 98, 5, 71, 174, 52, 64, 0, 4, 85, 36, 178, 8, 37, 146, 35, 154, 141, 36, 125, 15,
65, 179, 227, 108, 165, 56, 68, 68, 3, 62, 87, 233, 7, 81, 147, 93, 22, 95, 115, 202, 48,
66, 190, 229, 69, 146, 66, 108, 60, 114, 2, 228, 111, 40, 108, 5, 68, 226, 76, 240, 20,
231, 210, 214, 123, 175, 188, 161, 10, 125, 13, 196, 114, 248, 50, 84, 103, 139, 86, 223,
82, 173, 8, 209, 78, 188, 169, 151, 157, 42, 189, 153, 228, 105, 199, 19, 185, 16, 33, 133,
113, 253, 145, 36, 106, 14, 222, 128, 226, 239, 10, 39, 72, 113, 33, 113,
];
let nonce_1 = get_xsalsa20_poly1305_nonce(&test_packet_bytes);
let nonce_1_expected = vec![
144, 120, 98, 5, 71, 174, 52, 64, 0, 4, 85, 36, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
let nonce_2 = get_xsalsa20_poly1305_suffix_nonce(&test_packet_bytes);
let nonce_2_expected = vec![
228, 105, 199, 19, 185, 16, 33, 133, 113, 253, 145, 36, 106, 14, 222, 128, 226, 239, 10,
39, 72, 113, 33, 113,
];
let nonce_3 = get_xsalsa20_poly1305_lite_nonce(&test_packet_bytes);
let nonce_3_expected = vec![
72, 113, 33, 113, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
println!("nonce 1: {:?}", nonce_1);
println!("nonce 2: {:?}", nonce_2);
println!("nonce 3: {:?}", nonce_3);
assert_eq!(nonce_1.len(), 24);
assert_eq!(nonce_2.len(), 24);
assert_eq!(nonce_3.len(), 24);
assert_eq!(nonce_1, nonce_1_expected);
assert_eq!(nonce_2, nonce_2_expected);
assert_eq!(nonce_3, nonce_3_expected);
}

View File

@ -0,0 +1,23 @@
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub mod tungstenite;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub use tungstenite::*;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub mod wasm;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub use wasm::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type Sink = tungstenite::TungsteniteSink;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type Stream = tungstenite::TungsteniteStream;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_gateway"))]
pub type WebSocketBackend = tungstenite::TungsteniteBackend;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type Sink = wasm::WasmSink;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type Stream = wasm::WasmStream;
#[cfg(all(target_arch = "wasm32", feature = "voice_gateway"))]
pub type WebSocketBackend = wasm::WasmBackend;

View File

@ -0,0 +1,65 @@
use futures_util::{
stream::{SplitSink, SplitStream},
StreamExt,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async_tls_with_config, tungstenite, Connector, MaybeTlsStream, WebSocketStream,
};
use crate::{errors::VoiceGatewayError, voice::gateway::VoiceGatewayMessage};
#[derive(Debug, Clone)]
pub struct TungsteniteBackend;
// These could be made into inherent associated types when that's stabilized
pub type TungsteniteSink =
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>;
pub type TungsteniteStream = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
impl TungsteniteBackend {
pub async fn connect(
websocket_url: &str,
) -> Result<(TungsteniteSink, TungsteniteStream), crate::errors::VoiceGatewayError> {
let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
roots.add(&rustls::Certificate(cert.0)).unwrap();
}
let (websocket_stream, _) = match connect_async_tls_with_config(
websocket_url,
None,
false,
Some(Connector::Rustls(
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth()
.into(),
)),
)
.await
{
Ok(websocket_stream) => websocket_stream,
Err(e) => {
return Err(VoiceGatewayError::CannotConnect {
error: e.to_string(),
})
}
};
Ok(websocket_stream.split())
}
}
impl From<VoiceGatewayMessage> for tungstenite::Message {
fn from(message: VoiceGatewayMessage) -> Self {
Self::Text(message.0)
}
}
impl From<tungstenite::Message> for VoiceGatewayMessage {
fn from(value: tungstenite::Message) -> Self {
Self(value.to_string())
}
}

View File

@ -0,0 +1,48 @@
use futures_util::{
stream::{SplitSink, SplitStream},
StreamExt,
};
use ws_stream_wasm::*;
use crate::errors::VoiceGatewayError;
use crate::voice::gateway::VoiceGatewayMessage;
#[derive(Debug, Clone)]
pub struct WasmBackend;
// These could be made into inherent associated types when that's stabilized
pub type WasmSink = SplitSink<WsStream, WsMessage>;
pub type WasmStream = SplitStream<WsStream>;
impl WasmBackend {
pub async fn connect(websocket_url: &str) -> Result<(WasmSink, WasmStream), VoiceGatewayError> {
let (_, websocket_stream) = match WsMeta::connect(websocket_url, None).await {
Ok(stream) => Ok(stream),
Err(e) => Err(VoiceGatewayError::CannotConnect {
error: e.to_string(),
}),
}?;
Ok(websocket_stream.split())
}
}
impl From<VoiceGatewayMessage> for WsMessage {
fn from(message: VoiceGatewayMessage) -> Self {
Self::Text(message.0)
}
}
impl From<WsMessage> for VoiceGatewayMessage {
fn from(value: WsMessage) -> Self {
match value {
WsMessage::Text(text) => Self(text),
WsMessage::Binary(bin) => {
let mut text = String::new();
let _ = bin.iter().map(|v| text.push_str(&v.to_string()));
Self(text)
}
}
}
}

View File

@ -0,0 +1,24 @@
use crate::{
errors::VoiceGatewayError,
gateway::GatewayEvent,
types::{
SessionDescription, SessionUpdate, Speaking, SsrcDefinition, VoiceBackendVersion,
VoiceClientConnectFlags, VoiceClientConnectPlatform, VoiceClientDisconnection,
VoiceMediaSinkWants, VoiceReady,
},
};
#[derive(Default, Debug)]
pub struct VoiceEvents {
pub voice_ready: GatewayEvent<VoiceReady>,
pub backend_version: GatewayEvent<VoiceBackendVersion>,
pub session_description: GatewayEvent<SessionDescription>,
pub session_update: GatewayEvent<SessionUpdate>,
pub speaking: GatewayEvent<Speaking>,
pub ssrc_definition: GatewayEvent<SsrcDefinition>,
pub client_disconnect: GatewayEvent<VoiceClientDisconnection>,
pub client_connect_flags: GatewayEvent<VoiceClientConnectFlags>,
pub client_connect_platform: GatewayEvent<VoiceClientConnectPlatform>,
pub media_sink_wants: GatewayEvent<VoiceMediaSinkWants>,
pub error: GatewayEvent<VoiceGatewayError>,
}

View File

@ -0,0 +1,335 @@
use std::{sync::Arc, time::Duration};
use log::*;
use tokio::sync::Mutex;
use futures_util::SinkExt;
use futures_util::StreamExt;
use crate::{
errors::VoiceGatewayError,
gateway::GatewayEvent,
types::{
VoiceGatewayReceivePayload, VoiceHelloData, WebSocketEvent, VOICE_BACKEND_VERSION,
VOICE_CLIENT_CONNECT_FLAGS, VOICE_CLIENT_CONNECT_PLATFORM, VOICE_CLIENT_DISCONNECT,
VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK, VOICE_HELLO, VOICE_IDENTIFY, VOICE_MEDIA_SINK_WANTS,
VOICE_READY, VOICE_RESUME, VOICE_SELECT_PROTOCOL, VOICE_SESSION_DESCRIPTION,
VOICE_SESSION_UPDATE, VOICE_SPEAKING, VOICE_SSRC_DEFINITION,
},
voice::gateway::{
heartbeat::VoiceHeartbeatThreadCommunication, VoiceGatewayMessage, WebSocketBackend,
},
};
use super::{
events::VoiceEvents, heartbeat::VoiceHeartbeatHandler, Sink, Stream, VoiceGatewayHandle,
};
#[derive(Debug)]
pub struct VoiceGateway {
events: Arc<Mutex<VoiceEvents>>,
heartbeat_handler: VoiceHeartbeatHandler,
websocket_send: Arc<Mutex<Sink>>,
websocket_receive: Stream,
kill_send: tokio::sync::broadcast::Sender<()>,
}
impl VoiceGateway {
#[allow(clippy::new_ret_no_self)]
pub async fn spawn(websocket_url: String) -> Result<VoiceGatewayHandle, VoiceGatewayError> {
// Append the needed things to the websocket url
let processed_url = format!("wss://{}/?v=7", websocket_url);
trace!("Created voice socket url: {}", processed_url.clone());
let (websocket_send, mut websocket_receive) =
WebSocketBackend::connect(&processed_url).await?;
let shared_websocket_send = Arc::new(Mutex::new(websocket_send));
// Create a shared broadcast channel for killing all gateway tasks
let (kill_send, mut _kill_receive) = tokio::sync::broadcast::channel::<()>(16);
// Wait for the first hello and then spawn both tasks so we avoid nested tasks
// This automatically spawns the heartbeat task, but from the main thread
#[cfg(not(target_arch = "wasm32"))]
let msg: VoiceGatewayMessage = websocket_receive.next().await.unwrap().unwrap().into();
#[cfg(target_arch = "wasm32")]
let msg: VoiceGatewayMessage = websocket_receive.next().await.unwrap().into();
let gateway_payload: VoiceGatewayReceivePayload = serde_json::from_str(&msg.0).unwrap();
if gateway_payload.op_code != VOICE_HELLO {
return Err(VoiceGatewayError::NonHelloOnInitiate {
opcode: gateway_payload.op_code,
});
}
info!("VGW: Received Hello");
// The hello data for voice gateways is in float milliseconds, so we convert it to f64 seconds
let gateway_hello: VoiceHelloData =
serde_json::from_str(gateway_payload.data.get()).unwrap();
let heartbeat_interval_seconds: f64 = gateway_hello.heartbeat_interval / 1000.0;
let voice_events = VoiceEvents::default();
let shared_events = Arc::new(Mutex::new(voice_events));
let mut gateway = VoiceGateway {
events: shared_events.clone(),
heartbeat_handler: VoiceHeartbeatHandler::new(
Duration::from_secs_f64(heartbeat_interval_seconds),
1, // to:do actually compute nonce
shared_websocket_send.clone(),
kill_send.subscribe(),
),
websocket_send: shared_websocket_send.clone(),
websocket_receive,
kill_send: kill_send.clone(),
};
// Now we can continuously check for messages in a different task, since we aren't going to receive another hello
#[cfg(not(target_arch = "wasm32"))]
tokio::task::spawn(async move {
gateway.gateway_listen_task().await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
gateway.gateway_listen_task().await;
});
Ok(VoiceGatewayHandle {
url: websocket_url.clone(),
events: shared_events,
websocket_send: shared_websocket_send.clone(),
kill_send: kill_send.clone(),
})
}
/// The main gateway listener task;
///
/// Can only be stopped by closing the websocket, cannot be made to listen for kill
pub async fn gateway_listen_task(&mut self) {
loop {
let msg = self.websocket_receive.next().await;
// PRETTYFYME: Remove inline conditional compiling
#[cfg(not(target_arch = "wasm32"))]
if let Some(Ok(message)) = msg {
self.handle_message(message.into()).await;
continue;
}
#[cfg(target_arch = "wasm32")]
if let Some(message) = msg {
self.handle_message(message.into()).await;
continue;
}
// We couldn't receive the next message or it was an error, something is wrong with the websocket, close
warn!("VGW: Websocket is broken, stopping gateway");
break;
}
}
/// Closes the websocket connection and stops all tasks
async fn close(&mut self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
}
/// Deserializes and updates a dispatched event, when we already know its type;
/// (Called for every event in handle_message)
async fn handle_event<'a, T: WebSocketEvent + serde::Deserialize<'a>>(
data: &'a str,
event: &mut GatewayEvent<T>,
) -> Result<(), serde_json::Error> {
let data_deserialize_result: Result<T, serde_json::Error> = serde_json::from_str(data);
if data_deserialize_result.is_err() {
return Err(data_deserialize_result.err().unwrap());
}
event.notify(data_deserialize_result.unwrap()).await;
Ok(())
}
/// This handles a message as a websocket event and updates its events along with the events' observers
pub async fn handle_message(&mut self, msg: VoiceGatewayMessage) {
if msg.0.is_empty() {
return;
}
let Ok(gateway_payload) = msg.payload() else {
if let Some(error) = msg.error() {
warn!("GW: Received error {:?}, connection will close..", error);
self.close().await;
self.events.lock().await.error.notify(error).await;
} else {
warn!(
"Message unrecognised: {:?}, please open an issue on the chorus github",
msg.0
);
}
return;
};
// See <https://discord.com/developers/docs/topics/voice-connections>
match gateway_payload.op_code {
VOICE_READY => {
trace!("VGW: Received READY!");
let event = &mut self.events.lock().await.voice_ready;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!("Failed to parse VOICE_READY ({})", result.err().unwrap());
}
}
VOICE_BACKEND_VERSION => {
trace!("VGW: Received Backend Version");
let event = &mut self.events.lock().await.backend_version;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_BACKEND_VERSION ({})",
result.err().unwrap()
);
}
}
VOICE_SESSION_DESCRIPTION => {
trace!("VGW: Received Session Description");
let event = &mut self.events.lock().await.session_description;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SESSION_DESCRIPTION ({})",
result.err().unwrap()
);
}
}
VOICE_SESSION_UPDATE => {
trace!("VGW: Received Session Update");
let event = &mut self.events.lock().await.session_update;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SESSION_UPDATE ({})",
result.err().unwrap()
);
}
}
VOICE_SPEAKING => {
trace!("VGW: Received Speaking");
let event = &mut self.events.lock().await.speaking;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!("Failed to parse VOICE_SPEAKING ({})", result.err().unwrap());
}
}
VOICE_SSRC_DEFINITION => {
trace!("VGW: Received Ssrc Definition");
let event = &mut self.events.lock().await.ssrc_definition;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_SSRC_DEFINITION ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_DISCONNECT => {
trace!("VGW: Received Client Disconnect");
let event = &mut self.events.lock().await.client_disconnect;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_DISCONNECT ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_CONNECT_FLAGS => {
trace!("VGW: Received Client Connect Flags");
let event = &mut self.events.lock().await.client_connect_flags;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_CONNECT_FLAGS ({})",
result.err().unwrap()
);
}
}
VOICE_CLIENT_CONNECT_PLATFORM => {
trace!("VGW: Received Client Connect Platform");
let event = &mut self.events.lock().await.client_connect_platform;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_CLIENT_CONNECT_PLATFORM ({})",
result.err().unwrap()
);
}
}
VOICE_MEDIA_SINK_WANTS => {
trace!("VGW: Received Media Sink Wants");
let event = &mut self.events.lock().await.media_sink_wants;
let result = VoiceGateway::handle_event(gateway_payload.data.get(), event).await;
if result.is_err() {
warn!(
"Failed to parse VOICE_MEDIA_SINK_WANTS ({})",
result.err().unwrap()
);
}
}
// We received a heartbeat from the server
// "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately."
VOICE_HEARTBEAT => {
trace!("VGW: Received Heartbeat // Heartbeat Request");
// Tell the heartbeat handler it should send a heartbeat right away
let heartbeat_communication = VoiceHeartbeatThreadCommunication {
updated_nonce: None,
op_code: Some(VOICE_HEARTBEAT),
};
self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
VOICE_HEARTBEAT_ACK => {
trace!("VGW: Received Heartbeat ACK");
// Tell the heartbeat handler we received an ack
let heartbeat_communication = VoiceHeartbeatThreadCommunication {
updated_nonce: None,
op_code: Some(VOICE_HEARTBEAT_ACK),
};
self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
VOICE_IDENTIFY | VOICE_SELECT_PROTOCOL | VOICE_RESUME => {
info!(
"VGW: Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation and is likely not the fault of chorus.",
gateway_payload.op_code
);
}
_ => {
warn!("VGW: Received unrecognized voice gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code);
}
}
}
}

101
src/voice/gateway/handle.rs Normal file
View File

@ -0,0 +1,101 @@
use std::sync::Arc;
use log::*;
use futures_util::SinkExt;
use serde_json::json;
use tokio::sync::Mutex;
use crate::types::{
SelectProtocol, Speaking, SsrcDefinition, VoiceGatewaySendPayload, VoiceIdentify,
VOICE_BACKEND_VERSION, VOICE_IDENTIFY, VOICE_SELECT_PROTOCOL, VOICE_SPEAKING,
VOICE_SSRC_DEFINITION,
};
use super::{events::VoiceEvents, Sink, VoiceGatewayMessage};
/// Represents a handle to a Voice Gateway connection.
/// Using this handle you can send Gateway Events directly.
#[derive(Debug, Clone)]
pub struct VoiceGatewayHandle {
pub url: String,
pub events: Arc<Mutex<VoiceEvents>>,
pub websocket_send: Arc<Mutex<Sink>>,
/// Tells gateway tasks to close
pub(super) kill_send: tokio::sync::broadcast::Sender<()>,
}
impl VoiceGatewayHandle {
/// Sends json to the gateway with an opcode
async fn send_json(&self, op_code: u8, to_send: serde_json::Value) {
let gateway_payload = VoiceGatewaySendPayload {
op_code,
data: to_send,
};
let payload_json = serde_json::to_string(&gateway_payload).unwrap();
let message = VoiceGatewayMessage(payload_json);
self.websocket_send
.lock()
.await
.send(message.into())
.await
.unwrap();
}
/// Sends a voice identify event to the gateway
pub async fn send_identify(&self, to_send: VoiceIdentify) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Identify..");
self.send_json(VOICE_IDENTIFY, to_send_value).await;
}
/// Sends a select protocol event to the gateway
pub async fn send_select_protocol(&self, to_send: SelectProtocol) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Select Protocol");
self.send_json(VOICE_SELECT_PROTOCOL, to_send_value).await;
}
/// Sends a speaking event to the gateway
pub async fn send_speaking(&self, to_send: Speaking) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending Speaking");
self.send_json(VOICE_SPEAKING, to_send_value).await;
}
/// Sends an ssrc definition event
pub async fn send_ssrc_definition(&self, to_send: SsrcDefinition) {
let to_send_value = serde_json::to_value(&to_send).unwrap();
trace!("VGW: Sending SsrcDefinition");
self.send_json(VOICE_SSRC_DEFINITION, to_send_value).await;
}
/// Sends a voice backend version request to the gateway
pub async fn send_voice_backend_version_request(&self) {
let data_empty_object = json!("{}");
trace!("VGW: Requesting voice backend version");
self.send_json(VOICE_BACKEND_VERSION, data_empty_object)
.await;
}
/// Closes the websocket connection and stops all gateway tasks;
///
/// Esentially pulls the plug on the voice gateway, leaving it possible to resume;
pub async fn close(&self) {
self.kill_send.send(()).unwrap();
self.websocket_send.lock().await.close().await.unwrap();
}
}

View File

@ -0,0 +1,171 @@
use futures_util::SinkExt;
use log::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep_until;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep_until;
use std::{sync::Arc, time::Duration};
use tokio::sync::{
mpsc::{Receiver, Sender},
Mutex,
};
#[cfg(not(target_arch = "wasm32"))]
use tokio::task;
use crate::{
gateway::heartbeat::HEARTBEAT_ACK_TIMEOUT,
types::{VoiceGatewaySendPayload, VOICE_HEARTBEAT, VOICE_HEARTBEAT_ACK},
voice::gateway::VoiceGatewayMessage,
};
use super::Sink;
/// Handles sending heartbeats to the voice gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once all fields of VoiceHeartbeatHandler are used
#[derive(Debug)]
pub(super) struct VoiceHeartbeatHandler {
/// The heartbeat interval in milliseconds
pub heartbeat_interval: Duration,
/// The send channel for the heartbeat thread
pub send: Sender<VoiceHeartbeatThreadCommunication>,
}
impl VoiceHeartbeatHandler {
pub fn new(
heartbeat_interval: Duration,
starting_nonce: u64,
websocket_tx: Arc<Mutex<Sink>>,
kill_rc: tokio::sync::broadcast::Receiver<()>,
) -> Self {
let (send, receive) = tokio::sync::mpsc::channel(32);
let kill_receive = kill_rc.resubscribe();
#[cfg(not(target_arch = "wasm32"))]
task::spawn(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(async move {
Self::heartbeat_task(
websocket_tx,
heartbeat_interval,
starting_nonce,
receive,
kill_receive,
)
.await;
});
Self {
heartbeat_interval,
send,
}
}
/// The main heartbeat task;
///
/// Can be killed by the kill broadcast;
/// If the websocket is closed, will die out next time it tries to send a heartbeat;
pub async fn heartbeat_task(
websocket_tx: Arc<Mutex<Sink>>,
heartbeat_interval: Duration,
starting_nonce: u64,
mut receive: Receiver<VoiceHeartbeatThreadCommunication>,
mut kill_receive: tokio::sync::broadcast::Receiver<()>,
) {
let mut last_heartbeat_timestamp: Instant = Instant::now();
let mut last_heartbeat_acknowledged = true;
let mut nonce: u64 = starting_nonce;
loop {
if kill_receive.try_recv().is_ok() {
trace!("VGW: Closing heartbeat task");
break;
}
let timeout = if last_heartbeat_acknowledged {
heartbeat_interval
} else {
// If the server hasn't acknowledged our heartbeat we should resend it
Duration::from_millis(HEARTBEAT_ACK_TIMEOUT)
};
let mut should_send = false;
tokio::select! {
() = sleep_until(last_heartbeat_timestamp + timeout) => {
should_send = true;
}
Some(communication) = receive.recv() => {
// If we received a nonce update, use that nonce now
if communication.updated_nonce.is_some() {
nonce = communication.updated_nonce.unwrap();
}
if let Some(op_code) = communication.op_code {
match op_code {
VOICE_HEARTBEAT => {
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
should_send = true;
}
VOICE_HEARTBEAT_ACK => {
// The server received our heartbeat
last_heartbeat_acknowledged = true;
}
_ => {}
}
}
}
}
if should_send {
trace!("VGW: Sending Heartbeat..");
let heartbeat = VoiceGatewaySendPayload {
op_code: VOICE_HEARTBEAT,
data: nonce.into(),
};
let heartbeat_json = serde_json::to_string(&heartbeat).unwrap();
let msg = VoiceGatewayMessage(heartbeat_json);
let send_result = websocket_tx.lock().await.send(msg.into()).await;
if send_result.is_err() {
// We couldn't send, the websocket is broken
warn!("VGW: Couldnt send heartbeat, websocket seems broken");
break;
}
last_heartbeat_timestamp = Instant::now();
last_heartbeat_acknowledged = false;
}
}
}
}
/// Used for communications between the voice heartbeat and voice gateway thread.
/// Either signifies a nonce update, a heartbeat ACK or a Heartbeat request by the server
#[derive(Clone, Copy, Debug)]
pub(super) struct VoiceHeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant
pub(super) op_code: Option<u8>,
/// The new nonce to use, if any
pub(super) updated_nonce: Option<u64>,
}

View File

@ -0,0 +1,42 @@
use crate::{errors::VoiceGatewayError, types::VoiceGatewayReceivePayload};
/// Represents a messsage received from the voice websocket connection.
///
/// This will be either a [VoiceGatewayReceivePayload], containing voice gateway events, or a [VoiceGatewayError].
///
/// This struct is used internally when handling messages.
#[derive(Clone, Debug)]
pub struct VoiceGatewayMessage(pub String);
impl VoiceGatewayMessage {
/// Parses the message as an error;
/// Returns the error if succesfully parsed, None if the message isn't an error
pub fn error(&self) -> Option<VoiceGatewayError> {
// Some error strings have dots on the end, which we don't care about
let processed_content = self.0.to_lowercase().replace('.', "");
match processed_content.as_str() {
"unknown opcode" | "4001" => Some(VoiceGatewayError::UnknownOpcode),
"decode error" | "failed to decode payload" | "4002" => {
Some(VoiceGatewayError::FailedToDecodePayload)
}
"not authenticated" | "4003" => Some(VoiceGatewayError::NotAuthenticated),
"authentication failed" | "4004" => Some(VoiceGatewayError::AuthenticationFailed),
"already authenticated" | "4005" => Some(VoiceGatewayError::AlreadyAuthenticated),
"session is no longer valid" | "4006" => Some(VoiceGatewayError::SessionNoLongerValid),
"session timeout" | "4009" => Some(VoiceGatewayError::SessionTimeout),
"server not found" | "4011" => Some(VoiceGatewayError::ServerNotFound),
"unknown protocol" | "4012" => Some(VoiceGatewayError::UnknownProtocol),
"disconnected" | "4014" => Some(VoiceGatewayError::Disconnected),
"voice server crashed" | "4015" => Some(VoiceGatewayError::VoiceServerCrashed),
"unknown encryption mode" | "4016" => Some(VoiceGatewayError::UnknownEncryptionMode),
_ => None,
}
}
/// Parses the message as a payload;
/// Returns a result of deserializing
pub fn payload(&self) -> Result<VoiceGatewayReceivePayload, serde_json::Error> {
serde_json::from_str(&self.0)
}
}

11
src/voice/gateway/mod.rs Normal file
View File

@ -0,0 +1,11 @@
pub mod backends;
pub mod events;
pub mod gateway;
pub mod handle;
pub mod heartbeat;
pub mod message;
pub use backends::*;
pub use gateway::*;
pub use handle::*;
pub use message::*;

154
src/voice/handler.rs Normal file
View File

@ -0,0 +1,154 @@
use std::{net::SocketAddrV4, sync::Arc};
use async_trait::async_trait;
use tokio::sync::{Mutex, RwLock};
use crate::{
gateway::Observer,
types::{
GatewayReady, SelectProtocol, SelectProtocolData, SessionDescription, Snowflake,
VoiceEncryptionMode, VoiceIdentify, VoiceProtocol, VoiceReady, VoiceServerUpdate,
},
};
use super::{
gateway::{VoiceGateway, VoiceGatewayHandle},
udp::UdpHandle,
udp::UdpHandler,
voice_data::VoiceData,
};
/// Handles inbetween connections between the gateway and udp modules
#[derive(Debug, Clone)]
pub struct VoiceHandler {
pub voice_gateway_connection: Arc<Mutex<Option<VoiceGatewayHandle>>>,
pub voice_udp_connection: Arc<Mutex<Option<UdpHandle>>>,
pub data: Arc<RwLock<VoiceData>>,
}
impl VoiceHandler {
/// Creates a new voicehandler, only initializing the data
pub fn new() -> VoiceHandler {
Self {
data: Arc::new(RwLock::new(VoiceData::default())),
voice_gateway_connection: Arc::new(Mutex::new(None)),
voice_udp_connection: Arc::new(Mutex::new(None)),
}
}
}
impl Default for VoiceHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
// On [VoiceServerUpdate] we get our starting data and url for the voice gateway server.
impl Observer<VoiceServerUpdate> for VoiceHandler {
async fn update(&self, data: &VoiceServerUpdate) {
let mut data_lock = self.data.write().await;
data_lock.server_data = Some(data.clone());
let user_id = data_lock.user_id;
let session_id = data_lock.session_id.clone();
drop(data_lock);
let voice_gateway_handle = VoiceGateway::spawn(data.endpoint.clone().unwrap())
.await
.unwrap();
let server_id: Snowflake;
if data.guild_id.is_some() {
server_id = data.guild_id.unwrap();
} else {
server_id = data.channel_id.unwrap();
}
let voice_identify = VoiceIdentify {
server_id,
user_id,
session_id,
token: data.token.clone(),
video: Some(false),
};
voice_gateway_handle.send_identify(voice_identify).await;
let cloned_gateway_handle = voice_gateway_handle.clone();
let mut voice_events = cloned_gateway_handle.events.lock().await;
let self_reference = Arc::new(self.clone());
voice_events.voice_ready.subscribe(self_reference.clone());
voice_events
.session_description
.subscribe(self_reference.clone());
*self.voice_gateway_connection.lock().await = Some(voice_gateway_handle);
}
}
#[async_trait]
// On [VoiceReady] we get info for establishing a UDP connection, and we immedietly need said UDP
// connection for ip discovery.
impl Observer<VoiceReady> for VoiceHandler {
async fn update(&self, data: &VoiceReady) {
let mut data_lock = self.data.write().await;
data_lock.ready_data = Some(data.clone());
drop(data_lock);
let udp_handle = UdpHandler::spawn(
self.data.clone(),
std::net::SocketAddr::V4(SocketAddrV4::new(data.ip, data.port)),
data.ssrc,
)
.await
.unwrap();
let ip_discovery = self.data.read().await.ip_discovery.clone().unwrap();
*self.voice_udp_connection.lock().await = Some(udp_handle.clone());
let string_ip_address = String::from_utf8(ip_discovery.address).expect("Ip discovery gave non string ip");
self.voice_gateway_connection
.lock()
.await
.clone()
.unwrap()
.send_select_protocol(SelectProtocol {
protocol: VoiceProtocol::Udp,
data: SelectProtocolData {
address: string_ip_address,
port: ip_discovery.port,
mode: VoiceEncryptionMode::Xsalsa20Poly1305,
},
..Default::default()
})
.await;
}
}
#[async_trait]
// Session descryption gives us final info regarding codecs and our encryption key
impl Observer<SessionDescription> for VoiceHandler {
async fn update(&self, data: &SessionDescription) {
let mut data_write = self.data.write().await;
data_write.session_description = Some(data.clone());
drop(data_write);
}
}
#[async_trait]
impl Observer<GatewayReady> for VoiceHandler {
async fn update(&self, data: &GatewayReady) {
let mut lock = self.data.write().await;
lock.user_id = data.user.id;
lock.session_id = data.session_id.clone();
drop(lock);
}
}

15
src/voice/mod.rs Normal file
View File

@ -0,0 +1,15 @@
//! Module for all voice functionality within chorus.
mod crypto;
#[cfg(feature = "voice_gateway")]
pub mod gateway;
#[cfg(all(feature = "voice_udp", feature = "voice_gateway"))]
pub mod handler;
#[cfg(feature = "voice_udp")]
pub mod udp;
#[cfg(feature = "voice_udp")]
pub mod voice_data;
// Pub use this so users can interact with packet types if they want
#[cfg(feature = "voice_udp")]
pub use discortp;

View File

@ -0,0 +1,12 @@
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub mod tokio;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub use tokio::*;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub type UdpSocket = tokio::TokioSocket;
#[cfg(all(not(target_arch = "wasm32"), feature = "voice_udp"))]
pub type UdpBackend = tokio::TokioBackend;
#[cfg(all(target_arch = "wasm32", feature = "voice_udp"))]
compile_error!("UDP Voice support is not (and will likely never be) supported for WASM. This is because UDP cannot be used in the browser. We are however looking into Webrtc for WASM voice support.");

View File

@ -0,0 +1,33 @@
use std::net::SocketAddr;
use crate::errors::VoiceUdpError;
#[derive(Debug, Clone)]
pub struct TokioBackend;
pub type TokioSocket = tokio::net::UdpSocket;
impl TokioBackend {
pub async fn connect(url: SocketAddr) -> Result<TokioSocket, VoiceUdpError> {
// Bind with a port number of 0, so the os assigns this listener a port
let udp_socket_result = TokioSocket::bind("0.0.0.0:0").await;
if let Err(e) = udp_socket_result {
return Err(VoiceUdpError::CannotBind {
error: format!("{:?}", e),
});
}
let udp_socket = udp_socket_result.unwrap();
let connection_result = udp_socket.connect(url).await;
if let Err(e) = connection_result {
return Err(VoiceUdpError::CannotConnect {
error: format!("{:?}", e),
});
}
Ok(udp_socket)
}
}

21
src/voice/udp/events.rs Normal file
View File

@ -0,0 +1,21 @@
use discortp::{rtcp::Rtcp, rtp::Rtp};
use crate::{gateway::GatewayEvent, types::WebSocketEvent};
impl WebSocketEvent for Rtp {}
impl WebSocketEvent for Rtcp {}
#[derive(Debug)]
pub struct VoiceUDPEvents {
pub rtp: GatewayEvent<Rtp>,
pub rtcp: GatewayEvent<Rtcp>,
}
impl Default for VoiceUDPEvents {
fn default() -> Self {
Self {
rtp: GatewayEvent::new(),
rtcp: GatewayEvent::new(),
}
}
}

223
src/voice/udp/handle.rs Normal file
View File

@ -0,0 +1,223 @@
use std::sync::Arc;
use crypto_secretbox::{
aead::Aead, cipher::generic_array::GenericArray, KeyInit, XSalsa20Poly1305,
};
use discortp::Packet;
use getrandom::getrandom;
use log::*;
use tokio::{sync::Mutex, sync::RwLock};
use super::UdpSocket;
use crate::{
errors::VoiceUdpError,
types::VoiceEncryptionMode,
voice::{crypto::get_xsalsa20_poly1305_nonce, voice_data::VoiceData},
};
use super::{events::VoiceUDPEvents, RTP_HEADER_SIZE};
/// Handle to a voice udp connection
///
/// Can be safely cloned and will still correspond to the same connection.
#[derive(Debug, Clone)]
pub struct UdpHandle {
pub events: Arc<Mutex<VoiceUDPEvents>>,
pub(super) socket: Arc<UdpSocket>,
pub data: Arc<RwLock<VoiceData>>,
}
impl UdpHandle {
/// Constructs and sends encoded opus rtp data.
///
/// Automatically makes an [RtpPacket](discortp::rtp::RtpPacket), encrypts it and sends it.
///
/// # Errors
/// If we do not have VoiceReady data, which contains our ssrc, this returns a
/// [VoiceUdpError::NoData] error.
///
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_opus_data(
&self,
timestamp: u32,
payload: Vec<u8>,
) -> Result<(), VoiceUdpError> {
let voice_ready_data_result = self.data.read().await.ready_data.clone();
if voice_ready_data_result.is_none() {
return Err(VoiceUdpError::NoData);
}
let ssrc = voice_ready_data_result.unwrap().ssrc;
let sequence_number = self.data.read().await.last_sequence_number.wrapping_add(1);
self.data.write().await.last_sequence_number = sequence_number;
let payload_len = payload.len();
let rtp_data = discortp::rtp::Rtp {
// Always the same
version: 2,
padding: 0,
extension: 0,
csrc_count: 0,
csrc_list: Vec::new(),
marker: 0,
payload_type: discortp::rtp::RtpType::Dynamic(120),
// Actually variable
sequence: sequence_number.into(),
timestamp: timestamp.into(),
ssrc,
payload,
};
let buffer_size = payload_len + RTP_HEADER_SIZE as usize;
let mut buffer = vec![0; buffer_size];
let mut rtp_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).expect("Mangled rtp packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
rtp_packet.populate(&rtp_data);
self.send_rtp_packet(rtp_packet).await
}
/// Encrypts and sends and rtp packet.
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_rtp_packet(
&self,
packet: discortp::rtp::MutableRtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let mut buffer = self.encrypt_rtp_packet_payload(&packet).await?;
let new_packet = discortp::rtp::MutableRtpPacket::new(&mut buffer).unwrap();
self.send_encrypted_rtp_packet(new_packet.consume_to_immutable())
.await?;
Ok(())
}
/// Encrypts an unencrypted rtp packet, returning a copy of the packet's bytes with an
/// encrypted payload
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// When using voice encryption modes which require special nonce generation, and said generation fails, this returns a [VoiceUdpError::FailedNonceGeneration] error.
pub async fn encrypt_rtp_packet_payload(
&self,
packet: &discortp::rtp::MutableRtpPacket<'_>,
) -> Result<Vec<u8>, VoiceUdpError> {
let payload = packet.payload();
let session_description_result = self.data.read().await.session_description.clone();
// We are trying to encrypt, but have not received SessionDescription yet,
// which contains the secret key.
if session_description_result.is_none() {
return Err(VoiceUdpError::NoKey);
}
let session_description = session_description_result.unwrap();
let mut nonce_bytes = match session_description.encryption_mode {
VoiceEncryptionMode::Xsalsa20Poly1305 => get_xsalsa20_poly1305_nonce(packet.packet()),
VoiceEncryptionMode::Xsalsa20Poly1305Suffix => {
// Generate 24 random bytes
let mut random_destinaton: Vec<u8> = vec![0; 24];
let random_result = getrandom(&mut random_destinaton);
if let Err(e) = random_result {
return Err(VoiceUdpError::FailedNonceGeneration {
error: format!("{:?}", e),
});
}
random_destinaton
}
VoiceEncryptionMode::Xsalsa20Poly1305Lite => {
// "Incremental 4 bytes (32bit) int value"
let mut data_lock = self.data.write().await;
let nonce = data_lock
.last_udp_encryption_nonce
.unwrap_or_default()
.wrapping_add(1);
data_lock.last_udp_encryption_nonce = Some(nonce);
drop(data_lock);
// TODO: Is le correct? This is not documented anywhere
let mut bytes = nonce.to_le_bytes().to_vec();
// This is 4 bytes, it has to be 24, so we need to append 20
while bytes.len() < 24 {
bytes.push(0);
}
bytes
}
_ => {
// TODO: Implement aead_aes256_gcm
todo!("This voice encryption mode is not yet implemented.");
}
};
let nonce = GenericArray::from_slice(&nonce_bytes);
let key = GenericArray::from_slice(&session_description.secret_key);
let encryptor = XSalsa20Poly1305::new(key);
let encryption_result = encryptor.encrypt(nonce, payload);
if encryption_result.is_err() {
// Safety: If encryption errors here, it's chorus' fault, and it makes no sense to
// return the error to the user.
//
// This is not an error the user should account for, which is why we throw it here.
panic!("{}", VoiceUdpError::FailedEncryption);
}
let mut encrypted_payload = encryption_result.unwrap();
// Append the nonce bytes, if needed
// All other encryption modes have an explicit nonce, where as Xsalsa20Poly1305
// has the nonce as the rtp header.
if session_description.encryption_mode != VoiceEncryptionMode::Xsalsa20Poly1305 {
encrypted_payload.append(&mut nonce_bytes);
}
// We need to allocate a new buffer, since the old one is too small for our new encrypted
// data
let buffer_size = encrypted_payload.len() + RTP_HEADER_SIZE as usize;
let mut new_buffer: Vec<u8> = Vec::with_capacity(buffer_size);
let mut rtp_header = packet.packet().to_vec()[0..RTP_HEADER_SIZE as usize].to_vec();
new_buffer.append(&mut rtp_header);
new_buffer.append(&mut encrypted_payload);
Ok(new_buffer)
}
/// Sends an (already encrypted) rtp packet to the connection.
///
/// # Errors
/// If the Udp socket is broken, this returns a [VoiceUdpError::BrokenSocket] error.
pub async fn send_encrypted_rtp_packet(
&self,
packet: discortp::rtp::RtpPacket<'_>,
) -> Result<(), VoiceUdpError> {
let raw_bytes = packet.packet();
let send_res = self.socket.send(raw_bytes).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
debug!("VUDP: Sent rtp packet!");
Ok(())
}
}

333
src/voice/udp/handler.rs Normal file
View File

@ -0,0 +1,333 @@
use std::{net::SocketAddr, sync::Arc};
use crypto_secretbox::aead::Aead;
use crypto_secretbox::cipher::generic_array::GenericArray;
use crypto_secretbox::KeyInit;
use crypto_secretbox::XSalsa20Poly1305;
use discortp::demux::Demuxed;
use discortp::discord::{
IpDiscovery, IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket,
};
use discortp::rtcp::report::ReceiverReport;
use discortp::rtcp::report::SenderReport;
use discortp::{demux::demux, Packet};
use tokio::sync::{Mutex, RwLock};
use super::UdpBackend;
use super::UdpSocket;
use super::RTP_HEADER_SIZE;
use crate::errors::VoiceUdpError;
use crate::types::VoiceEncryptionMode;
use crate::voice::crypto::get_xsalsa20_poly1305_lite_nonce;
use crate::voice::crypto::get_xsalsa20_poly1305_nonce;
use crate::voice::crypto::get_xsalsa20_poly1305_suffix_nonce;
use crate::voice::voice_data::VoiceData;
use super::{events::VoiceUDPEvents, UdpHandle};
use log::*;
#[derive(Debug)]
/// The main UDP struct, which handles receiving, parsing and decrypting the rtp packets
pub struct UdpHandler {
events: Arc<Mutex<VoiceUDPEvents>>,
pub data: Arc<RwLock<VoiceData>>,
socket: Arc<UdpSocket>,
}
impl UdpHandler {
/// Spawns a new udp handler and performs ip discovery.
///
/// Mutates the given data_reference with the ip discovery data.
pub async fn spawn(
data_reference: Arc<RwLock<VoiceData>>,
url: SocketAddr,
ssrc: u32,
) -> Result<UdpHandle, VoiceUdpError> {
let udp_socket = UdpBackend::connect(url).await?;
// First perform ip discovery
let ip_discovery = IpDiscovery {
pkt_type: IpDiscoveryType::Request,
ssrc,
length: 70,
address: Vec::new(),
port: 0,
payload: Vec::new(),
};
// Minimum size with an empty Address value, + 64 bytes for the actual address size
let size = IpDiscoveryPacket::minimum_packet_size() + 64;
let mut buf: Vec<u8> = vec![0; size];
// Safety: expect is justified here, since this is an error which should never happen.
// If this errors, the code at fault is the buffer size calculation.
let mut ip_discovery_packet =
MutableIpDiscoveryPacket::new(&mut buf).expect("Mangled ip discovery packet creation buffer, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
ip_discovery_packet.populate(&ip_discovery);
let data = ip_discovery_packet.packet();
info!("VUDP: Sending Ip Discovery {:?}", &data);
let send_res = udp_socket.send(data).await;
if let Err(e) = send_res {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
info!("VUDP: Sent packet discovery request");
// Handle the ip discovery response
let received_size_or_err = udp_socket.recv(&mut buf).await;
if let Err(e) = received_size_or_err {
return Err(VoiceUdpError::BrokenSocket {
error: format!("{:?}", e),
});
}
let received_size = received_size_or_err.unwrap();
info!(
"VUDP: Receiving messsage: {:?} - (expected {} vs real {})",
buf.clone(),
size,
received_size
);
let receieved_ip_discovery = IpDiscoveryPacket::new(&buf).expect("Could not make ipdiscovery packet from received data, something is very wrong. Please open an issue on the chorus github: https://github.com/polyphony-chat/chorus/issues/new");
info!(
"VUDP: Received ip discovery!!! {:?}",
receieved_ip_discovery
);
let ip_discovery = IpDiscovery {
pkt_type: receieved_ip_discovery.get_pkt_type(),
length: receieved_ip_discovery.get_length(),
ssrc: receieved_ip_discovery.get_ssrc(),
address: receieved_ip_discovery.get_address(),
port: receieved_ip_discovery.get_port(),
payload: Vec::new(),
};
let mut data_reference_lock = data_reference.write().await;
data_reference_lock.ip_discovery = Some(ip_discovery);
drop(data_reference_lock);
let socket = Arc::new(udp_socket);
let events = VoiceUDPEvents::default();
let shared_events = Arc::new(Mutex::new(events));
let mut handler = UdpHandler {
events: shared_events.clone(),
data: data_reference.clone(),
socket: socket.clone(),
};
// Now we can continuously check for messages in a different task
tokio::spawn(async move {
handler.listen_task().await;
});
Ok(UdpHandle {
events: shared_events,
socket,
data: data_reference,
})
}
/// The main listen task;
///
/// Receives udp messages and parses them.
async fn listen_task(&mut self) {
loop {
// FIXME: is there a max size for these packets?
// Allocating 512 bytes seems a bit extreme
//
// Update: see <https://stackoverflow.com/questions/58097580/rtp-packet-maximum-size>
// > "The RTP standard does not set a maximum size.."
//
// The theorhetical max for this buffer would be 1458 bytes, but that is imo
// unreasonable to allocate for every message.
let mut buf: Vec<u8> = vec![0; 512];
let result = self.socket.recv(&mut buf).await;
if let Ok(size) = result {
self.handle_message(&buf[0..size]).await;
continue;
}
warn!("VUDP: Voice UDP is broken, closing connection");
break;
}
}
/// Handles a message buf
async fn handle_message(&self, buf: &[u8]) {
let parsed = demux(buf);
match parsed {
Demuxed::Rtp(rtp) => {
trace!("VUDP: Parsed packet as rtp! {:?}", buf);
let decryption_result = self.decrypt_rtp_packet_payload(&rtp).await;
if let Err(err) = decryption_result {
match err {
VoiceUdpError::NoKey => {
warn!("VUDP: Received encyrpted voice data, but no encryption key, CANNOT DECRYPT!");
return;
}
VoiceUdpError::FailedDecryption => {
warn!("VUDP: Failed to decrypt voice data!");
return;
}
_ => {
unreachable!();
}
}
}
let decrypted = decryption_result.unwrap();
debug!("VUDP: Successfully decrypted voice data!");
let rtp_with_decrypted_data = discortp::rtp::Rtp {
ssrc: rtp.get_ssrc(),
marker: rtp.get_marker(),
version: rtp.get_version(),
padding: rtp.get_padding(),
sequence: rtp.get_sequence(),
extension: rtp.get_extension(),
timestamp: rtp.get_timestamp(),
csrc_list: rtp.get_csrc_list(),
csrc_count: rtp.get_csrc_count(),
payload_type: rtp.get_payload_type(),
payload: decrypted,
};
self.events
.lock()
.await
.rtp
.notify(rtp_with_decrypted_data)
.await;
}
Demuxed::Rtcp(rtcp) => {
trace!("VUDP: Parsed packet as rtcp!");
let rtcp_data = match rtcp {
discortp::rtcp::RtcpPacket::KnownType(knowntype) => {
discortp::rtcp::Rtcp::KnownType(knowntype)
}
discortp::rtcp::RtcpPacket::SenderReport(senderreport) => {
discortp::rtcp::Rtcp::SenderReport(SenderReport {
payload: senderreport.payload().to_vec(),
padding: senderreport.get_padding(),
version: senderreport.get_version(),
ssrc: senderreport.get_ssrc(),
pkt_length: senderreport.get_pkt_length(),
packet_type: senderreport.get_packet_type(),
rx_report_count: senderreport.get_rx_report_count(),
})
}
discortp::rtcp::RtcpPacket::ReceiverReport(receiverreport) => {
discortp::rtcp::Rtcp::ReceiverReport(ReceiverReport {
payload: receiverreport.payload().to_vec(),
padding: receiverreport.get_padding(),
version: receiverreport.get_version(),
ssrc: receiverreport.get_ssrc(),
pkt_length: receiverreport.get_pkt_length(),
packet_type: receiverreport.get_packet_type(),
rx_report_count: receiverreport.get_rx_report_count(),
})
}
_ => {
unreachable!();
}
};
self.events.lock().await.rtcp.notify(rtcp_data).await;
}
Demuxed::FailedParse(e) => {
trace!("VUDP: Failed to parse packet: {:?}", e);
}
Demuxed::TooSmall => {
unreachable!()
}
}
}
/// Decrypts an encrypted rtp packet, returning a decrypted copy of the packet's payload
/// bytes.
///
/// # Errors
/// If we have not received an encryption key, this returns a [VoiceUdpError::NoKey] error.
///
/// If the decryption fails, this returns a [VoiceUdpError::FailedDecryption].
pub async fn decrypt_rtp_packet_payload(
&self,
rtp: &discortp::rtp::RtpPacket<'_>,
) -> Result<Vec<u8>, VoiceUdpError> {
let packet_bytes = rtp.packet();
let mut ciphertext: Vec<u8> =
packet_bytes[(RTP_HEADER_SIZE as usize)..packet_bytes.len()].to_vec();
let session_description_result = self.data.read().await.session_description.clone();
// We are trying to decrypt, but have not received SessionDescription yet,
// which contains the secret key
if session_description_result.is_none() {
return Err(VoiceUdpError::NoKey);
}
let session_description = session_description_result.unwrap();
let nonce_bytes = match session_description.encryption_mode {
VoiceEncryptionMode::Xsalsa20Poly1305 => get_xsalsa20_poly1305_nonce(packet_bytes),
VoiceEncryptionMode::Xsalsa20Poly1305Suffix => {
// Remove the suffix from the ciphertext
ciphertext = ciphertext[0..ciphertext.len() - 24].to_vec();
get_xsalsa20_poly1305_suffix_nonce(packet_bytes)
}
// Note: Rtpsize is documented by userdoccers to be the same, yet decryption
// doesn't work.
//
// I have no idea how Rtpsize works.
VoiceEncryptionMode::Xsalsa20Poly1305Lite => {
// Remove the suffix from the ciphertext
ciphertext = ciphertext[0..ciphertext.len() - 4].to_vec();
get_xsalsa20_poly1305_lite_nonce(packet_bytes)
}
_ => {
// TODO: Implement aead_aes256_gcm
todo!("This voice encryption mode is not yet implemented.");
}
};
let nonce = GenericArray::from_slice(&nonce_bytes);
let key = GenericArray::from_slice(&session_description.secret_key);
let decryptor = XSalsa20Poly1305::new(key);
let decryption_result = decryptor.decrypt(nonce, ciphertext.as_ref());
// Note: this may seem like we are throwing away valuable error handling data,
// but the decryption error provides no extra info.
if decryption_result.is_err() {
return Err(VoiceUdpError::FailedDecryption);
}
Ok(decryption_result.unwrap())
}
}

14
src/voice/udp/mod.rs Normal file
View File

@ -0,0 +1,14 @@
//! Defines the udp component of voice communications, sending and receiving raw rtp data.
/// See <https://discord-userdoccers.vercel.app/topics/voice-connections#voice-packet-structure>
/// This always adds up to 12 bytes
const RTP_HEADER_SIZE: u8 = 12;
pub mod backends;
pub mod events;
pub mod handle;
pub mod handler;
pub use backends::*;
pub use handle::*;
pub use handler::*;

21
src/voice/voice_data.rs Normal file
View File

@ -0,0 +1,21 @@
use discortp::discord::IpDiscovery;
use crate::types::{SessionDescription, Snowflake, VoiceReady, VoiceServerUpdate};
#[derive(Debug, Default)]
/// Saves data shared between parts of the voice architecture;
///
/// Struct used to give the Udp connection data received from the gateway.
pub struct VoiceData {
pub server_data: Option<VoiceServerUpdate>,
pub ready_data: Option<VoiceReady>,
pub session_description: Option<SessionDescription>,
pub user_id: Snowflake,
pub session_id: String,
/// The last sequence number we used, has to be incremeted by one every time we send a message
pub last_sequence_number: u16,
pub ip_discovery: Option<IpDiscovery>,
/// The last udp encryption nonce, if we are using an encryption mode with incremental nonces.
pub last_udp_encryption_nonce: Option<u32>,
}

View File

@ -1,15 +1,22 @@
mod common;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use async_trait::async_trait;
use chorus::errors::GatewayError;
use chorus::gateway::*;
use chorus::types::{self, ChannelModifySchema, RoleCreateModifySchema, RoleObject};
use chorus::types::{self, ChannelModifySchema, GatewayReady, RoleCreateModifySchema, RoleObject};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test_configure!(run_in_browser);
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::sleep;
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
/// Tests establishing a connection (hello and heartbeats) on the local gateway;
@ -20,6 +27,18 @@ async fn test_gateway_establish() {
common::teardown(bundle).await
}
#[derive(Debug)]
struct GatewayReadyObserver {
channel: tokio::sync::mpsc::Sender<()>,
}
#[async_trait]
impl Observer<GatewayReady> for GatewayReadyObserver {
async fn update(&self, _data: &GatewayReady) {
self.channel.send(()).await.unwrap();
}
}
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
/// Tests establishing a connection and authenticating
@ -28,10 +47,35 @@ async fn test_gateway_authenticate() {
let gateway: GatewayHandle = Gateway::spawn(bundle.urls.wss.clone()).await.unwrap();
let (ready_send, mut ready_receive) = tokio::sync::mpsc::channel(1);
let observer = Arc::new(GatewayReadyObserver {
channel: ready_send,
});
gateway
.events
.lock()
.await
.session
.ready
.subscribe(observer);
let mut identify = types::GatewayIdentifyPayload::common();
identify.token = bundle.user.token.clone();
gateway.send_identify(identify).await;
tokio::select! {
// Fail, we timed out waiting for it
() = sleep(Duration::from_secs(20)) => {
println!("Timed out waiting for event, failing..");
assert!(false);
}
// Sucess, we have received it
Some(_) = ready_receive.recv() => {}
};
common::teardown(bundle).await
}