use std::{collections::HashSet, sync::Arc}; use futures::SinkExt; use reqwest::{Client, Error}; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::net::TcpStream; use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, connect_async, connect_async_with_config, }; use tungstenite::{Message, Result, protocol::WebSocketConfig}; use crate::emotes::{Emote, RetrieveEmoteAPI, RetrieveEmoteHandler, RetrieveEmoteWS}; #[derive(Deserialize, Serialize, Clone, Debug)] pub struct User { pub id: String, pub alias_id: usize, pub username: String, pub emote_set_id: String, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmoteSet { pub id: String, pub name: String, pub owner: User, pub emotes: Vec, } pub struct SevenTVAPIClient { client: Client, base_url: String, } impl SevenTVAPIClient { pub fn new() -> Self { Self { client: Client::new(), base_url: "https://7tv.io/v3".into(), } } pub async fn get_user_by_twitch_id(&self, twitch_id: usize) -> Option { let client = Client::new(); let response: serde_json::Value = client .get(format!("{}/users/twitch/{}", self.base_url, twitch_id)) .send() .await .ok()? .error_for_status() .ok()? .json() .await .ok()?; let alias_id = response["id"].as_str()?.parse::().ok()?; let username = response["username"].as_str()?; let emote_set_id = response["emote_set_id"].as_str()?; let id = response["user"]["id"].as_str()?; Some(User { id: id.to_string(), alias_id, username: username.to_string(), emote_set_id: emote_set_id.to_string(), }) } pub async fn get_user_by_id(&self, id: &str) -> Option { let client = Client::new(); let response: serde_json::Value = client .get(format!("{}/users/{}", self.base_url, id)) .send() .await .ok()? .error_for_status() .ok()? .json() .await .ok()?; self.parse_user_json(&response) } pub async fn get_emote_set(&self, emote_set_id: &str) -> Option { let client = Client::new(); let response: serde_json::Value = client .get(format!("{}/emote-sets/{}", self.base_url, emote_set_id)) .send() .await .ok()? .error_for_status() .ok()? .json() .await .ok()?; let id = response["id"].as_str()?; let name = response["name"].as_str()?; let owner = self.parse_user_json(&response["owner"])?; Some(EmoteSet { id: id.to_string(), name: name.to_string(), owner, emotes: self.parse_emoteset(&response), }) } fn parse_user_json(&self, json: &Value) -> Option { let id = json["id"].as_str()?; let connections = json["connections"].as_array()?; let twitch_connection = connections.iter().find(|x| { let Some(platform) = x["platform"].as_str() else { return false; }; platform.eq("TWITCH") })?; Some(User { id: id.to_string(), alias_id: twitch_connection["id"].as_str()?.parse().ok()?, username: twitch_connection["username"].as_str()?.to_string(), emote_set_id: twitch_connection["emote_set_id"].as_str()?.to_string(), }) } fn parse_emoteset(&self, emotesets_json: &Value) -> Vec { let mut emotes = Vec::new(); let emote_values = emotesets_json.get("emotes").unwrap().as_array().unwrap(); for emote_value in emote_values { let id = emote_value.get("id").unwrap().as_str().unwrap().to_string(); let code = emote_value .get("name") .unwrap() .as_str() .unwrap() .to_string(); let o_code = emote_value .get("data") .unwrap() .get("name") .unwrap() .to_string(); let original_code: Option = if code.eq(&o_code) { None } else { Some(o_code) }; emotes.push(Emote { id, code, original_code, }); } emotes } } impl RetrieveEmoteAPI for SevenTVAPIClient { async fn get_channel_emotes(&self, channel_id: &str) -> Result, Error> { let response = self .client .get(format!("{}/users/twitch/{}", self.base_url, channel_id)) .send() .await? .error_for_status()?; let json: Value = response.json().await?; let set = json.get("emote_set").unwrap(); let emotes = self.parse_emoteset(set); Ok(emotes) } async fn get_global_emotes(&self) -> Result, Error> { let response = self .client .get(format!("{}/emote-sets/global", self.base_url)) .send() .await? .error_for_status()?; let json: Value = response.json().await?; let emotes = self.parse_emoteset(&json); Ok(emotes) } } pub struct SevenTVWSClient { url: String, on_emote_create: Option>, on_emote_update: Option>, on_emote_delete: Option>, joined_channels: HashSet, awaiting_channels: HashSet, identified: bool, } impl RetrieveEmoteWS for SevenTVWSClient { fn on_emote_create( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_create = Some(Arc::new(func)); } fn on_emote_update( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_update = Some(Arc::new(func)); } fn on_emote_delete( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_delete = Some(Arc::new(func)); } } impl SevenTVWSClient { pub async fn new() -> Result<(WebSocketStream>, Self)> { let url = "wss://events.7tv.io/v3"; let config = WebSocketConfig::default(); let (socket, _) = connect_async_with_config(url, Some(config), false).await?; Ok(( socket, Self { url: url.to_string(), on_emote_create: None, on_emote_delete: None, on_emote_update: None, joined_channels: HashSet::new(), awaiting_channels: HashSet::new(), identified: false, }, )) } pub async fn process( &mut self, stream: &mut WebSocketStream>, ) -> Result<()> { if self.identified { self.listen_emote_sets(stream).await; } tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { let msg = match msg { Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { *stream = connect_async(self.url.clone()).await?.0; self.await_channels(); return Ok(()); } _ => msg?, }; self.process_message(msg, stream).await; }); Ok(()) } pub fn subscribe_emote_set(&mut self, emote_set_id: String) { if self.joined_channels.contains(&emote_set_id) { return; } self.awaiting_channels.insert(emote_set_id); } pub fn unsubscribe_emote_set(&mut self, emote_set_id: String) { if !self.joined_channels.contains(&emote_set_id) { return; } self.awaiting_channels.insert(emote_set_id); } async fn process_message( &mut self, msg: Message, stream: &mut WebSocketStream>, ) { match msg { Message::Text(text) => { let text = text.to_string(); let json: serde_json::Value = serde_json::from_str(&text).expect("Error parsing JSON payload"); let operation_code = json["op"].as_i64().expect("No op code"); // unsupported operation if operation_code == 1 { self.listen_emote_sets(stream).await; self.identified = true; return; } else if operation_code != 0 { return; } let event_data = &json["d"]; if event_data["type"] .as_str() .expect("No d.type") .ne("emote_set.update") { return; } let event_data = &event_data["body"]; let channel = event_data["id"].as_str().expect("No body.id").to_string(); let actor_data = &event_data["actor"]; let author = Some( actor_data["id"] .as_str() .expect("No body.actor.id") .to_string(), ); if let (Some(pushed), Some(func)) = (event_data["pushed"].as_array(), &self.on_emote_create) { for emote in pushed { let emote = &emote["value"]; (func)(channel.clone(), author.clone(), self.create_emote(emote)); } } if let (Some(pulled), Some(func)) = (event_data["pulled"].as_array(), &self.on_emote_delete) { for emote in pulled { let emote = &emote["old_value"]; (func)(channel.clone(), author.clone(), self.create_emote(emote)); } } if let (Some(updated), Some(func)) = (event_data["updated"].as_array(), &self.on_emote_update) { for emote in updated { let old_emote = &emote["old_value"]; let emote = &emote["value"]; let id = old_emote["id"] .as_str() .expect("No old_value.id") .to_string(); let code = emote["name"].as_str().expect("No value.name").to_string(); let original_code = old_emote["name"] .as_str() .expect("No old_value.name") .to_string(); let emote = Emote { id, original_code: if code.ne(&original_code) { Some(original_code) } else { None }, code, }; (func)(channel.clone(), author.clone(), emote); } } } _ => {} } } fn create_emote(&self, value: &Value) -> Emote { let id = value["id"].as_str().expect("No value.id").to_string(); let code = value["name"].as_str().expect("No value.name").to_string(); let original_code = value["data"]["name"] .as_str() .expect("No value.data.name") .to_string(); Emote { id, original_code: if code.ne(&original_code) { Some(original_code) } else { None }, code, } } async fn listen_emote_sets(&mut self, stream: &mut WebSocketStream>) { for id in &self.awaiting_channels { let json = if self.joined_channels.contains(id) { self.joined_channels.remove(id); serde_json::json!({ "op": 36, "d": { "type": "emote_set.update", "condition": { "object_id": id } } }) } else { self.joined_channels.insert(id.clone()); serde_json::json!({ "op": 35, "d": { "type": "emote_set.update", "condition": { "object_id": id } } }) }; stream .send(Message::Text( serde_json::to_string(&json) .expect("Error converting JSON to String") .into(), )) .await .expect("Error sending join/part request"); } self.awaiting_channels.clear(); } fn await_channels(&mut self) { let c = self.joined_channels.clone(); self.awaiting_channels.extend(c); self.joined_channels.clear(); } }