diff options
| author | ilotterytea <iltsu@alright.party> | 2025-04-08 16:38:04 +0500 |
|---|---|---|
| committer | ilotterytea <iltsu@alright.party> | 2025-04-08 16:38:04 +0500 |
| commit | 317df3c75633dcb74ab89684d02b6205dec91d1e (patch) | |
| tree | b61897fa339bedc55d597485a0f7f50bbd3940bd | |
| parent | bf2de4e8ac6226b3c0dcad57a6f7c68c6449ac76 (diff) | |
feat: 7tv websocket client
| -rw-r--r-- | src/seventv.rs | 256 |
1 files changed, 255 insertions, 1 deletions
diff --git a/src/seventv.rs b/src/seventv.rs index 97d462b..4ca225a 100644 --- a/src/seventv.rs +++ b/src/seventv.rs @@ -1,7 +1,15 @@ +use std::collections::HashSet; + +use futures::SinkExt; use reqwest::{Client, Error}; use serde_json::Value; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + MaybeTlsStream, WebSocketStream, connect_async, connect_async_with_config, +}; +use tungstenite::{Message, Result, protocol::WebSocketConfig}; -use crate::emotes::{Emote, RetrieveEmoteAPI}; +use crate::emotes::{Emote, RetrieveEmoteAPI, RetrieveEmoteWS}; pub struct SevenTVAPIClient { client: Client, @@ -82,3 +90,249 @@ impl RetrieveEmoteAPI<Emote> for SevenTVAPIClient { Ok(emotes) } } + +pub struct SevenTVWSClient { + url: String, + on_emote_create: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_update: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_delete: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + + joined_channels: HashSet<String>, + awaiting_channels: HashSet<String>, + + identified: bool, +} + +impl SevenTVWSClient { + pub async fn new() -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Self)> { + let url = "wss://events.7tv.io/v3"; + + let config = WebSocketConfig::default(); + + let (socket, _) = connect_async_with_config(url, Some(config), false).await?; + + Ok(( + socket, + Self { + url: url.to_string(), + on_emote_create: None, + on_emote_delete: None, + on_emote_update: None, + joined_channels: HashSet::new(), + awaiting_channels: HashSet::new(), + identified: false, + }, + )) + } + + pub async fn process( + &mut self, + stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, + ) -> Result<()> { + if self.identified { + self.join_channels(stream).await; + } + + tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { + let msg = match msg { + Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { + *stream = connect_async(self.url.clone()).await?.0; + self.await_channels(); + return Ok(()); + } + _ => msg?, + }; + + self.process_message(msg, stream).await; + }); + + Ok(()) + } + + pub fn join_channel(&mut self, twitch_id: String) { + if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id) + { + return; + } + + self.awaiting_channels.insert(twitch_id); + } + + async fn process_message( + &mut self, + msg: Message, + stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, + ) { + match msg { + Message::Text(text) => { + let text = text.to_string(); + + let json: serde_json::Value = + serde_json::from_str(&text).expect("Error parsing JSON payload"); + + let operation_code = json["op"].as_i64().expect("No op code"); + + // unsupported operation + if operation_code == 1 { + self.join_channels(stream).await; + self.identified = true; + return; + } else if operation_code != 0 { + return; + } + + let event_data = &json["d"]; + + if event_data["type"] + .as_str() + .expect("No d.type") + .ne("emote_set.update") + { + return; + } + + let event_data = &event_data["body"]; + + let channel = event_data["id"].as_str().expect("No body.id").to_string(); + + let actor_data = &event_data["actor"]; + + let author = Some( + actor_data["id"] + .as_str() + .expect("No body.actor.id") + .to_string(), + ); + + if let (Some(pushed), Some(func)) = + (event_data["pushed"].as_array(), &self.on_emote_create) + { + for emote in pushed { + let emote = &emote["value"]; + (func)(channel.clone(), author.clone(), self.create_emote(emote)); + } + } + + if let (Some(pulled), Some(func)) = + (event_data["pulled"].as_array(), &self.on_emote_delete) + { + for emote in pulled { + let emote = &emote["old_value"]; + (func)(channel.clone(), author.clone(), self.create_emote(emote)); + } + } + + if let (Some(updated), Some(func)) = + (event_data["updated"].as_array(), &self.on_emote_update) + { + for emote in updated { + let old_emote = &emote["old_value"]; + let emote = &emote["value"]; + + let id = old_emote["id"] + .as_str() + .expect("No old_value.id") + .to_string(); + + let code = emote["name"].as_str().expect("No value.name").to_string(); + + let original_code = old_emote["name"] + .as_str() + .expect("No old_value.name") + .to_string(); + + let emote = Emote { + id, + original_code: if code.ne(&original_code) { + Some(original_code) + } else { + None + }, + code, + }; + + (func)(channel.clone(), author.clone(), emote); + } + } + } + _ => {} + } + } + + fn create_emote(&self, value: &Value) -> Emote { + let id = value["id"].as_str().expect("No value.id").to_string(); + + let code = value["name"].as_str().expect("No value.name").to_string(); + + let original_code = value["data"]["name"] + .as_str() + .expect("No value.data.name") + .to_string(); + + Emote { + id, + original_code: if code.ne(&original_code) { + Some(original_code) + } else { + None + }, + code, + } + } + + async fn join_channels(&mut self, stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) { + for id in &self.awaiting_channels { + let json = serde_json::json!({ + "op": 35, + "d": { + "type": "emote_set.update", + "condition": { + "object_id": id + } + } + }); + + stream + .send(Message::Text( + serde_json::to_string(&json) + .expect("Error converting JSON to String") + .into(), + )) + .await + .expect("Error sending join request"); + + self.joined_channels.insert(id.clone()); + } + + self.awaiting_channels.clear(); + } + + fn await_channels(&mut self) { + let c = self.joined_channels.clone(); + self.awaiting_channels.extend(c); + self.joined_channels.clear(); + } +} + +impl RetrieveEmoteWS<Emote> for SevenTVWSClient { + fn on_emote_create( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_create = Some(Box::new(func)); + } + + fn on_emote_update( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_update = Some(Box::new(func)); + } + + fn on_emote_delete( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_delete = Some(Box::new(func)); + } +} |
