diff options
Diffstat (limited to 'src/betterttv.rs')
| -rw-r--r-- | src/betterttv.rs | 197 |
1 files changed, 196 insertions, 1 deletions
diff --git a/src/betterttv.rs b/src/betterttv.rs index 1fc99b2..07dae2c 100644 --- a/src/betterttv.rs +++ b/src/betterttv.rs @@ -1,8 +1,14 @@ +use std::collections::HashSet; + +use futures::SinkExt; use reqwest::{Client, Error}; use serde::Deserialize; use serde_json::Value; +use tokio::net::TcpStream; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Result}; +use tungstenite::Message; -use crate::emotes::{EmoteBase, RetrieveEmoteAPI}; +use crate::emotes::{Emote, EmoteBase, RetrieveEmoteAPI, RetrieveEmoteWS}; #[derive(Debug, Deserialize, Clone)] pub struct BetterTTVEmote { @@ -87,3 +93,192 @@ impl RetrieveEmoteAPI<BetterTTVEmote> for BetterTTVAPIClient { Ok(serde_json::from_value(json).unwrap()) } } + +pub struct BetterTTVWSClient { + url: String, + on_emote_create: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_update: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + on_emote_delete: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>, + + joined_channels: HashSet<usize>, + awaiting_channels: HashSet<usize>, +} + +impl BetterTTVWSClient { + pub async fn new() -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Self)> { + let url = "wss://sockets.betterttv.net/ws"; + + let s = connect_async(url).await?; + + Ok(( + s.0, + Self { + url: url.to_string(), + on_emote_create: None, + on_emote_delete: None, + on_emote_update: None, + joined_channels: HashSet::new(), + awaiting_channels: HashSet::new(), + }, + )) + } + + pub async fn process( + &mut self, + stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>, + ) -> Result<()> { + self.join_channels(stream).await; + + tokio::select!(Some(msg) = futures::StreamExt::next(stream) => { + let msg = match msg { + Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => { + *stream = connect_async(self.url.clone()).await?.0; + self.await_channels(); + return Ok(()); + } + _ => msg?, + }; + + self.process_message(msg); + }); + + Ok(()) + } + + pub fn join_channel(&mut self, twitch_id: usize) { + if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id) + { + return; + } + + self.awaiting_channels.insert(twitch_id); + } + + fn process_message(&self, msg: Message) { + match msg { + Message::Text(text) => { + let text = text.to_string(); + + let json: serde_json::Value = + serde_json::from_str(&text).expect("Error parsing JSON payload"); + + let event_data = &json["data"]; + + let event_name = json["name"].as_str().expect("No event name"); + + if event_name.eq("emote_create") { + if let Some(func) = &self.on_emote_create { + let emote_data = &event_data["emote"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_data["id"].as_str().expect("No emote.id").to_string(), + code: emote_data["code"] + .as_str() + .expect("No emote.code") + .to_string(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } else if event_name.eq("emote_update") { + if let Some(func) = &self.on_emote_update { + let emote_data = &event_data["emote"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_data["id"].as_str().expect("No emote.id").to_string(), + code: emote_data["code"] + .as_str() + .expect("No emote.code") + .to_string(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } else if event_name.eq("emote_delete") { + if let Some(func) = &self.on_emote_delete { + let emote_id = &event_data["emoteId"]; + let channel_data = event_data["channel"] + .as_str() + .expect("No channel") + .to_string() + .clone(); + + let emote = Emote { + id: emote_id.as_str().expect("No emoteId").to_string(), + code: "".into(), + original_code: None, + }; + + (func)(channel_data, None, emote); + } + } + } + _ => {} + } + } + + async fn join_channels(&mut self, stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) { + for id in &self.awaiting_channels { + let json = serde_json::json!({ + "name": "join_channel", + "data": { + "name": format!("twitch:{}", id) + } + }); + + stream + .send(Message::Text( + serde_json::to_string(&json) + .expect("Error converting JSON to String") + .into(), + )) + .await + .expect("Error sending join request"); + + self.joined_channels.insert(*id); + } + + self.awaiting_channels.clear(); + } + + fn await_channels(&mut self) { + let c = self.joined_channels.clone(); + self.awaiting_channels.extend(c); + self.joined_channels.clear(); + } +} + +impl RetrieveEmoteWS<Emote> for BetterTTVWSClient { + fn on_emote_create( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_create = Some(Box::new(func)); + } + + fn on_emote_update( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_update = Some(Box::new(func)); + } + + fn on_emote_delete( + &mut self, + func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync), + ) { + self.on_emote_delete = Some(Box::new(func)); + } +} |
