use std::collections::HashSet; use futures::SinkExt; use reqwest::{Client, Error}; use serde_json::Value; use tokio::net::TcpStream; use tokio_tungstenite::{ MaybeTlsStream, WebSocketStream, connect_async, connect_async_with_config, }; use tungstenite::{Message, Result, protocol::WebSocketConfig}; use crate::emotes::{Emote, RetrieveEmoteAPI, RetrieveEmoteWS}; pub struct SevenTVAPIClient { client: Client, base_url: String, } impl SevenTVAPIClient { pub fn new() -> Self { Self { client: Client::new(), base_url: "https://7tv.io/v3".into(), } } fn parse_emoteset(&self, emotesets_json: &Value) -> Vec { let mut emotes = Vec::new(); let emote_values = emotesets_json.get("emotes").unwrap().as_array().unwrap(); for emote_value in emote_values { let id = emote_value.get("id").unwrap().as_str().unwrap().to_string(); let code = emote_value .get("name") .unwrap() .as_str() .unwrap() .to_string(); let o_code = emote_value .get("data") .unwrap() .get("name") .unwrap() .to_string(); let original_code: Option = if code.eq(&o_code) { None } else { Some(o_code) }; emotes.push(Emote { id, code, original_code, }); } emotes } } impl RetrieveEmoteAPI for SevenTVAPIClient { async fn get_channel_emotes(&self, channel_id: &str) -> Result, Error> { let response = self .client .get(format!("{}/users/twitch/{}", self.base_url, channel_id)) .send() .await? .error_for_status()?; let json: Value = response.json().await?; let set = json.get("emote_set").unwrap(); let emotes = self.parse_emoteset(set); Ok(emotes) } async fn get_global_emotes(&self) -> Result, Error> { let response = self .client .get(format!("{}/emote-sets/global", self.base_url)) .send() .await? .error_for_status()?; let json: Value = response.json().await?; let emotes = self.parse_emoteset(&json); Ok(emotes) } } pub struct SevenTVWSClient { url: String, on_emote_create: Option, Emote) + Send + Sync>>, on_emote_update: Option, Emote) + Send + Sync>>, on_emote_delete: Option, Emote) + Send + Sync>>, joined_channels: HashSet, awaiting_channels: HashSet, identified: bool, } impl SevenTVWSClient { pub async fn new() -> Result<(WebSocketStream>, Self)> { let url = "wss://events.7tv.io/v3"; let config = WebSocketConfig::default(); let (socket, _) = connect_async_with_config(url, Some(config), false).await?; Ok(( socket, Self { url: url.to_string(), on_emote_create: None, on_emote_delete: None, on_emote_update: None, joined_channels: HashSet::new(), awaiting_channels: HashSet::new(), identified: false, }, )) } pub async fn process( &mut self, stream: &mut WebSocketStream>, ) -> Result<()> { if self.identified { self.join_channels(stream).await; } tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { let msg = match msg { Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { *stream = connect_async(self.url.clone()).await?.0; self.await_channels(); return Ok(()); } _ => msg?, }; self.process_message(msg, stream).await; }); Ok(()) } pub fn join_channel(&mut self, twitch_id: String) { if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id) { return; } self.awaiting_channels.insert(twitch_id); } async fn process_message( &mut self, msg: Message, stream: &mut WebSocketStream>, ) { match msg { Message::Text(text) => { let text = text.to_string(); let json: serde_json::Value = serde_json::from_str(&text).expect("Error parsing JSON payload"); let operation_code = json["op"].as_i64().expect("No op code"); // unsupported operation if operation_code == 1 { self.join_channels(stream).await; self.identified = true; return; } else if operation_code != 0 { return; } let event_data = &json["d"]; if event_data["type"] .as_str() .expect("No d.type") .ne("emote_set.update") { return; } let event_data = &event_data["body"]; let channel = event_data["id"].as_str().expect("No body.id").to_string(); let actor_data = &event_data["actor"]; let author = Some( actor_data["id"] .as_str() .expect("No body.actor.id") .to_string(), ); if let (Some(pushed), Some(func)) = (event_data["pushed"].as_array(), &self.on_emote_create) { for emote in pushed { let emote = &emote["value"]; (func)(channel.clone(), author.clone(), self.create_emote(emote)); } } if let (Some(pulled), Some(func)) = (event_data["pulled"].as_array(), &self.on_emote_delete) { for emote in pulled { let emote = &emote["old_value"]; (func)(channel.clone(), author.clone(), self.create_emote(emote)); } } if let (Some(updated), Some(func)) = (event_data["updated"].as_array(), &self.on_emote_update) { for emote in updated { let old_emote = &emote["old_value"]; let emote = &emote["value"]; let id = old_emote["id"] .as_str() .expect("No old_value.id") .to_string(); let code = emote["name"].as_str().expect("No value.name").to_string(); let original_code = old_emote["name"] .as_str() .expect("No old_value.name") .to_string(); let emote = Emote { id, original_code: if code.ne(&original_code) { Some(original_code) } else { None }, code, }; (func)(channel.clone(), author.clone(), emote); } } } _ => {} } } fn create_emote(&self, value: &Value) -> Emote { let id = value["id"].as_str().expect("No value.id").to_string(); let code = value["name"].as_str().expect("No value.name").to_string(); let original_code = value["data"]["name"] .as_str() .expect("No value.data.name") .to_string(); Emote { id, original_code: if code.ne(&original_code) { Some(original_code) } else { None }, code, } } async fn join_channels(&mut self, stream: &mut WebSocketStream>) { for id in &self.awaiting_channels { let json = serde_json::json!({ "op": 35, "d": { "type": "emote_set.update", "condition": { "object_id": id } } }); stream .send(Message::Text( serde_json::to_string(&json) .expect("Error converting JSON to String") .into(), )) .await .expect("Error sending join request"); self.joined_channels.insert(id.clone()); } self.awaiting_channels.clear(); } fn await_channels(&mut self) { let c = self.joined_channels.clone(); self.awaiting_channels.extend(c); self.joined_channels.clear(); } } impl RetrieveEmoteWS for SevenTVWSClient { fn on_emote_create( &mut self, func: &'static (dyn Fn(String, Option, Emote) + Send + Sync), ) { self.on_emote_create = Some(Box::new(func)); } fn on_emote_update( &mut self, func: &'static (dyn Fn(String, Option, Emote) + Send + Sync), ) { self.on_emote_update = Some(Box::new(func)); } fn on_emote_delete( &mut self, func: &'static (dyn Fn(String, Option, Emote) + Send + Sync), ) { self.on_emote_delete = Some(Box::new(func)); } }