diff options
| author | ilotterytea <iltsu@alright.party> | 2025-04-08 15:10:00 +0500 |
|---|---|---|
| committer | ilotterytea <iltsu@alright.party> | 2025-04-08 15:10:00 +0500 |
| commit | bf2de4e8ac6226b3c0dcad57a6f7c68c6449ac76 (patch) | |
| tree | 9867dc536f68b71393d8c051efbdad7b50d69137 | |
| parent | 8b156e7e62a5e8c36c671ba7a4ea7cdc39635c05 (diff) | |
feat: betterttv websocket client
| -rw-r--r-- | Cargo.lock | 247 | ||||
| -rw-r--r-- | Cargo.toml | 3 | ||||
| -rw-r--r-- | src/betterttv.rs | 197 | ||||
| -rw-r--r-- | src/emotes.rs | 6 |
4 files changed, 452 insertions, 1 deletions
@@ -57,6 +57,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] name = "bumpalo" version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -100,6 +109,41 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] name = "displaydoc" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -172,12 +216,28 @@ dependencies = [ ] [[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-channel" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -187,6 +247,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -204,10 +292,26 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", ] [[package]] @@ -705,6 +809,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" [[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] name = "proc-macro2" version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -729,6 +842,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" [[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha", + "rand_core", + "zerocopy", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + +[[package]] name = "reqwest" version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -933,6 +1076,17 @@ dependencies = [ ] [[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] name = "shlex" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1041,6 +1195,26 @@ dependencies = [ ] [[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "tinystr" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1098,6 +1272,20 @@ dependencies = [ ] [[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + +[[package]] name = "tokio-util" version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1163,16 +1351,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "utf-8", +] + +[[package]] name = "twitch_emotes" version = "0.1.0" dependencies = [ + "futures", "reqwest", "serde", "serde_json", "tokio", + "tokio-tungstenite", + "tungstenite", ] [[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + +[[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1196,6 +1411,12 @@ dependencies = [ ] [[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + +[[package]] name = "utf16_iter" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1214,6 +1435,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] name = "want" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1545,6 +1772,26 @@ dependencies = [ ] [[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "zerofrom" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -6,7 +6,10 @@ version = "0.1.0" edition = "2024" [dependencies] +futures = "0.3.31" reqwest = { version = "0.12.15", features = ["json"] } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" tokio = { version = "1.44.1", default-features = false, features = ["rt", "macros"] } +tokio-tungstenite = { version = "0.26.2", features = ["native-tls"] } +tungstenite = { version = "0.26.2", default-features = false } diff --git a/src/betterttv.rs b/src/betterttv.rs index 1fc99b2..07dae2c 100644 --- a/src/betterttv.rs +++ b/src/betterttv.rs @@ -1,8 +1,14 @@ +use std::collections::HashSet; + +use futures::SinkExt; use reqwest::{Client, Error}; use serde::Deserialize; use serde_json::Value; +use tokio::net::TcpStream; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Result}; +use tungstenite::Message; -use crate::emotes::{EmoteBase, RetrieveEmoteAPI}; +use crate::emotes::{Emote, EmoteBase, RetrieveEmoteAPI, RetrieveEmoteWS}; #[derive(Debug, Deserialize, Clone)] pub struct BetterTTVEmote { @@ -87,3 +93,192 @@ impl RetrieveEmoteAPI<BetterTTVEmote> for BetterTTVAPIClient { Ok(serde_json::from_value(json).unwrap()) } } + +pub struct BetterTTVWSClient { + url: String, + on_emote_create: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_update: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_delete: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + + joined_channels: HashSet<usize>, + awaiting_channels: HashSet<usize>, +} + +impl BetterTTVWSClient { + pub async fn new() -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Self)> { + let url = "wss://sockets.betterttv.net/ws"; + + let s = connect_async(url).await?; + + Ok(( + s.0, + Self { + url: url.to_string(), + on_emote_create: None, + on_emote_delete: None, + on_emote_update: None, + joined_channels: HashSet::new(), + awaiting_channels: HashSet::new(), + }, + )) + } + + pub async fn process( + &mut self, + stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, + ) -> Result<()> { + self.join_channels(stream).await; + + tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { + let msg = match msg { + Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { + *stream = connect_async(self.url.clone()).await?.0; + self.await_channels(); + return Ok(()); + } + _ => msg?, + }; + + self.process_message(msg); + }); + + Ok(()) + } + + pub fn join_channel(&mut self, twitch_id: usize) { + if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id) + { + return; + } + + self.awaiting_channels.insert(twitch_id); + } + + fn process_message(&self, msg: Message) { + match msg { + Message::Text(text) => { + let text = text.to_string(); + + let json: serde_json::Value = + serde_json::from_str(&text).expect("Error parsing JSON payload"); + + let event_data = &json["data"]; + + let event_name = json["name"].as_str().expect("No event name"); + + if event_name.eq("emote_create") { + if let Some(func) = &self.on_emote_create { + let emote_data = &event_data["emote"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_data["id"].as_str().expect("No emote.id").to_string(), + code: emote_data["code"] + .as_str() + .expect("No emote.code") + .to_string(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } else if event_name.eq("emote_update") { + if let Some(func) = &self.on_emote_update { + let emote_data = &event_data["emote"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_data["id"].as_str().expect("No emote.id").to_string(), + code: emote_data["code"] + .as_str() + .expect("No emote.code") + .to_string(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } else if event_name.eq("emote_delete") { + if let Some(func) = &self.on_emote_delete { + let emote_id = &event_data["emoteId"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_id.as_str().expect("No emoteId").to_string(), + code: "".into(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } + } + _ => {} + } + } + + async fn join_channels(&mut self, stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) { + for id in &self.awaiting_channels { + let json = serde_json::json!({ + "name": "join_channel", + "data": { + "name": format!("twitch:{}", id) + } + }); + + stream + .send(Message::Text( + serde_json::to_string(&json) + .expect("Error converting JSON to String") + .into(), + )) + .await + .expect("Error sending join request"); + + self.joined_channels.insert(*id); + } + + self.awaiting_channels.clear(); + } + + fn await_channels(&mut self) { + let c = self.joined_channels.clone(); + self.awaiting_channels.extend(c); + self.joined_channels.clear(); + } +} + +impl RetrieveEmoteWS<Emote> for BetterTTVWSClient { + fn on_emote_create( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_create = Some(Box::new(func)); + } + + fn on_emote_update( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_update = Some(Box::new(func)); + } + + fn on_emote_delete( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_delete = Some(Box::new(func)); + } +} diff --git a/src/emotes.rs b/src/emotes.rs index 608f7a9..1fa932c 100644 --- a/src/emotes.rs +++ b/src/emotes.rs @@ -32,3 +32,9 @@ pub trait RetrieveEmoteAPI<T> { async fn get_channel_emotes(&self, channel_login: &str) -> Result<Vec<T>, Error>; async fn get_global_emotes(&self) -> Result<Vec<T>, Error>; } + +pub trait RetrieveEmoteWS<T> { + fn on_emote_create(&mut self, func: &'static (dyn Fn(String, Option<String>, T) + Send + Sync)); + fn on_emote_update(&mut self, func: &'static (dyn Fn(String, Option<String>, T) + Send + Sync)); + fn on_emote_delete(&mut self, func: &'static (dyn Fn(String, Option<String>, T) + Send + Sync)); +} |
