summaryrefslogtreecommitdiff
path: root/src/betterttv.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/betterttv.rs')
-rw-r--r--src/betterttv.rs197
1 files changed, 196 insertions, 1 deletions
diff --git a/src/betterttv.rs b/src/betterttv.rs
index 1fc99b2..07dae2c 100644
--- a/src/betterttv.rs
+++ b/src/betterttv.rs
@@ -1,8 +1,14 @@
+use std::collections::HashSet;
+
+use futures::SinkExt;
use reqwest::{Client, Error};
use serde::Deserialize;
use serde_json::Value;
+use tokio::net::TcpStream;
+use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Result};
+use tungstenite::Message;
-use crate::emotes::{EmoteBase, RetrieveEmoteAPI};
+use crate::emotes::{Emote, EmoteBase, RetrieveEmoteAPI, RetrieveEmoteWS};
#[derive(Debug, Deserialize, Clone)]
pub struct BetterTTVEmote {
@@ -87,3 +93,192 @@ impl RetrieveEmoteAPI<BetterTTVEmote> for BetterTTVAPIClient {
Ok(serde_json::from_value(json).unwrap())
}
}
+
+pub struct BetterTTVWSClient {
+ url: String,
+ on_emote_create: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+ on_emote_update: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+ on_emote_delete: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+
+ joined_channels: HashSet<usize>,
+ awaiting_channels: HashSet<usize>,
+}
+
+impl BetterTTVWSClient {
+ pub async fn new() -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Self)> {
+ let url = "wss://sockets.betterttv.net/ws";
+
+ let s = connect_async(url).await?;
+
+ Ok((
+ s.0,
+ Self {
+ url: url.to_string(),
+ on_emote_create: None,
+ on_emote_delete: None,
+ on_emote_update: None,
+ joined_channels: HashSet::new(),
+ awaiting_channels: HashSet::new(),
+ },
+ ))
+ }
+
+ pub async fn process(
+ &mut self,
+ stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
+ ) -> Result<()> {
+ self.join_channels(stream).await;
+
+ tokio::select!(Some(msg) = futures::StreamExt::next(stream) => {
+ let msg = match msg {
+ Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => {
+ *stream = connect_async(self.url.clone()).await?.0;
+ self.await_channels();
+ return Ok(());
+ }
+ _ => msg?,
+ };
+
+ self.process_message(msg);
+ });
+
+ Ok(())
+ }
+
+ pub fn join_channel(&mut self, twitch_id: usize) {
+ if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id)
+ {
+ return;
+ }
+
+ self.awaiting_channels.insert(twitch_id);
+ }
+
+ fn process_message(&self, msg: Message) {
+ match msg {
+ Message::Text(text) => {
+ let text = text.to_string();
+
+ let json: serde_json::Value =
+ serde_json::from_str(&text).expect("Error parsing JSON payload");
+
+ let event_data = &json["data"];
+
+ let event_name = json["name"].as_str().expect("No event name");
+
+ if event_name.eq("emote_create") {
+ if let Some(func) = &self.on_emote_create {
+ let emote_data = &event_data["emote"];
+ let channel_data = event_data["channel"]
+ .as_str()
+ .expect("No channel")
+ .to_string()
+ .clone();
+
+ let emote = Emote {
+ id: emote_data["id"].as_str().expect("No emote.id").to_string(),
+ code: emote_data["code"]
+ .as_str()
+ .expect("No emote.code")
+ .to_string(),
+ original_code: None,
+ };
+
+ (func)(channel_data, None, emote);
+ }
+ } else if event_name.eq("emote_update") {
+ if let Some(func) = &self.on_emote_update {
+ let emote_data = &event_data["emote"];
+ let channel_data = event_data["channel"]
+ .as_str()
+ .expect("No channel")
+ .to_string()
+ .clone();
+
+ let emote = Emote {
+ id: emote_data["id"].as_str().expect("No emote.id").to_string(),
+ code: emote_data["code"]
+ .as_str()
+ .expect("No emote.code")
+ .to_string(),
+ original_code: None,
+ };
+
+ (func)(channel_data, None, emote);
+ }
+ } else if event_name.eq("emote_delete") {
+ if let Some(func) = &self.on_emote_delete {
+ let emote_id = &event_data["emoteId"];
+ let channel_data = event_data["channel"]
+ .as_str()
+ .expect("No channel")
+ .to_string()
+ .clone();
+
+ let emote = Emote {
+ id: emote_id.as_str().expect("No emoteId").to_string(),
+ code: "".into(),
+ original_code: None,
+ };
+
+ (func)(channel_data, None, emote);
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
+ async fn join_channels(&mut self, stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) {
+ for id in &self.awaiting_channels {
+ let json = serde_json::json!({
+ "name": "join_channel",
+ "data": {
+ "name": format!("twitch:{}", id)
+ }
+ });
+
+ stream
+ .send(Message::Text(
+ serde_json::to_string(&json)
+ .expect("Error converting JSON to String")
+ .into(),
+ ))
+ .await
+ .expect("Error sending join request");
+
+ self.joined_channels.insert(*id);
+ }
+
+ self.awaiting_channels.clear();
+ }
+
+ fn await_channels(&mut self) {
+ let c = self.joined_channels.clone();
+ self.awaiting_channels.extend(c);
+ self.joined_channels.clear();
+ }
+}
+
+impl RetrieveEmoteWS<Emote> for BetterTTVWSClient {
+ fn on_emote_create(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_create = Some(Box::new(func));
+ }
+
+ fn on_emote_update(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_update = Some(Box::new(func));
+ }
+
+ fn on_emote_delete(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_delete = Some(Box::new(func));
+ }
+}