use std::{collections::HashSet, sync::Arc}; use futures::SinkExt; use reqwest::{Client, Error}; use serde::Deserialize; use serde_json::Value; use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Result}; use tungstenite::Message; use crate::emotes::{Emote, EmoteBase, RetrieveEmoteAPI, RetrieveEmoteHandler, RetrieveEmoteWS}; #[derive(Debug, Deserialize, Clone)] pub struct BetterTTVEmote { pub id: String, pub code: String, #[serde(rename = "originalCode")] pub original_code: Option, #[serde(rename = "imageType")] pub image_type: String, pub animated: bool, } impl EmoteBase for BetterTTVEmote { fn get_id(&self) -> &String { &self.id } fn get_code(&self) -> &String { &self.code } fn get_original_code(&self) -> &Option { &self.original_code } } pub struct BetterTTVAPIClient { client: Client, base_url: String, } impl BetterTTVAPIClient { pub fn new() -> Self { Self { client: Client::new(), base_url: "https://api.betterttv.net/3".into(), } } } impl RetrieveEmoteAPI for BetterTTVAPIClient { async fn get_channel_emotes(&self, channel_id: &str) -> Result, Error> { let response = self .client .get(format!( "{}/cached/users/twitch/{}", self.base_url, channel_id )) .send() .await? .error_for_status()?; let json: Value = response.json().await?; let mut emotes = Vec::new(); if let Some(shared_emotes) = json.get("sharedEmotes") { let shared_emotes: Vec = serde_json::from_value(shared_emotes.clone()).unwrap(); emotes.extend(shared_emotes); } if let Some(channel_emotes) = json.get("channelEmotes") { let channel_emotes: Vec = serde_json::from_value(channel_emotes.clone()).unwrap(); emotes.extend(channel_emotes); } Ok(emotes) } async fn get_global_emotes(&self) -> Result, Error> { let response = self .client .get(format!("{}/cached/emotes/global", self.base_url)) .send() .await? .error_for_status()?; let json: Value = response.json().await?; Ok(serde_json::from_value(json).unwrap()) } } pub struct BetterTTVWSClient { url: String, on_emote_create: Option>, on_emote_update: Option>, on_emote_delete: Option>, joined_channels: HashSet, awaiting_channels: HashSet, } impl RetrieveEmoteWS for BetterTTVWSClient { fn on_emote_create( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_create = Some(Arc::new(func)); } fn on_emote_update( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_update = Some(Arc::new(func)); } fn on_emote_delete( &mut self, func: impl Fn(String, Option, Emote) + Send + Sync + 'static, ) { self.on_emote_delete = Some(Arc::new(func)); } } impl BetterTTVWSClient { pub async fn new() -> Result<(WebSocketStream>, Self)> { let url = "wss://sockets.betterttv.net/ws"; let s = connect_async(url).await?; Ok(( s.0, Self { url: url.to_string(), on_emote_create: None, on_emote_delete: None, on_emote_update: None, joined_channels: HashSet::new(), awaiting_channels: HashSet::new(), }, )) } pub async fn process( &mut self, stream: &mut WebSocketStream>, ) -> Result<()> { self.join_channels(stream).await; tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { let msg = match msg { Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { *stream = connect_async(self.url.clone()).await?.0; self.await_channels(); return Ok(()); } _ => msg?, }; self.process_message(msg); }); Ok(()) } pub fn join_channel(&mut self, twitch_id: usize) { if self.joined_channels.contains(&twitch_id) { return; } self.awaiting_channels.insert(twitch_id); } pub fn part_channel(&mut self, twitch_id: usize) { if !self.joined_channels.contains(&twitch_id) { return; } self.awaiting_channels.insert(twitch_id); } fn process_message(&self, msg: Message) { match msg { Message::Text(text) => { let text = text.to_string(); let json: serde_json::Value = serde_json::from_str(&text).expect("Error parsing JSON payload"); let event_data = &json["data"]; let event_name = json["name"].as_str().expect("No event name"); if event_name.eq("emote_create") { if let Some(func) = &self.on_emote_create { let emote_data = &event_data["emote"]; let channel_data = event_data["channel"] .as_str() .expect("No channel") .to_string() .clone(); let emote = Emote { id: emote_data["id"].as_str().expect("No emote.id").to_string(), code: emote_data["code"] .as_str() .expect("No emote.code") .to_string(), original_code: None, }; (func)(channel_data, None, emote); } } else if event_name.eq("emote_update") { if let Some(func) = &self.on_emote_update { let emote_data = &event_data["emote"]; let channel_data = event_data["channel"] .as_str() .expect("No channel") .to_string() .clone(); let emote = Emote { id: emote_data["id"].as_str().expect("No emote.id").to_string(), code: emote_data["code"] .as_str() .expect("No emote.code") .to_string(), original_code: None, }; (func)(channel_data, None, emote); } } else if event_name.eq("emote_delete") { if let Some(func) = &self.on_emote_delete { let emote_id = &event_data["emoteId"]; let channel_data = event_data["channel"] .as_str() .expect("No channel") .to_string() .clone(); let emote = Emote { id: emote_id.as_str().expect("No emoteId").to_string(), code: "".into(), original_code: None, }; (func)(channel_data, None, emote); } } } _ => {} } } async fn join_channels(&mut self, stream: &mut WebSocketStream>) { for id in &self.awaiting_channels { let json = if self.joined_channels.contains(id) { self.joined_channels.remove(id); serde_json::json!({ "name": "part_channel", "data": { "name": format!("twitch:{}", id) } }) } else { self.joined_channels.insert(*id); serde_json::json!({ "name": "join_channel", "data": { "name": format!("twitch:{}", id) } }) }; stream .send(Message::Text( serde_json::to_string(&json) .expect("Error converting JSON to String") .into(), )) .await .expect("Error sending join/part request"); } self.awaiting_channels.clear(); } fn await_channels(&mut self) { let c = self.joined_channels.clone(); self.awaiting_channels.extend(c); self.joined_channels.clear(); } }