summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorilotterytea <iltsu@alright.party>2025-04-08 16:38:04 +0500
committerilotterytea <iltsu@alright.party>2025-04-08 16:38:04 +0500
commit317df3c75633dcb74ab89684d02b6205dec91d1e (patch)
treeb61897fa339bedc55d597485a0f7f50bbd3940bd
parentbf2de4e8ac6226b3c0dcad57a6f7c68c6449ac76 (diff)
feat: 7tv websocket client
-rw-r--r--src/seventv.rs256
1 files changed, 255 insertions, 1 deletions
diff --git a/src/seventv.rs b/src/seventv.rs
index 97d462b..4ca225a 100644
--- a/src/seventv.rs
+++ b/src/seventv.rs
@@ -1,7 +1,15 @@
+use std::collections::HashSet;
+
+use futures::SinkExt;
use reqwest::{Client, Error};
use serde_json::Value;
+use tokio::net::TcpStream;
+use tokio_tungstenite::{
+ MaybeTlsStream, WebSocketStream, connect_async, connect_async_with_config,
+};
+use tungstenite::{Message, Result, protocol::WebSocketConfig};
-use crate::emotes::{Emote, RetrieveEmoteAPI};
+use crate::emotes::{Emote, RetrieveEmoteAPI, RetrieveEmoteWS};
pub struct SevenTVAPIClient {
client: Client,
@@ -82,3 +90,249 @@ impl RetrieveEmoteAPI<Emote> for SevenTVAPIClient {
Ok(emotes)
}
}
+
+pub struct SevenTVWSClient {
+ url: String,
+ on_emote_create: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+ on_emote_update: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+ on_emote_delete: Option<Box<dyn Fn(String, Option<String>, Emote) + Send + Sync>>,
+
+ joined_channels: HashSet<String>,
+ awaiting_channels: HashSet<String>,
+
+ identified: bool,
+}
+
+impl SevenTVWSClient {
+ pub async fn new() -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Self)> {
+ let url = "wss://events.7tv.io/v3";
+
+ let config = WebSocketConfig::default();
+
+ let (socket, _) = connect_async_with_config(url, Some(config), false).await?;
+
+ Ok((
+ socket,
+ Self {
+ url: url.to_string(),
+ on_emote_create: None,
+ on_emote_delete: None,
+ on_emote_update: None,
+ joined_channels: HashSet::new(),
+ awaiting_channels: HashSet::new(),
+ identified: false,
+ },
+ ))
+ }
+
+ pub async fn process(
+ &mut self,
+ stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
+ ) -> Result<()> {
+ if self.identified {
+ self.join_channels(stream).await;
+ }
+
+ tokio::select!(Some(msg) = futures::StreamExt::next(stream) => {
+ let msg = match msg {
+ Err(tungstenite::Error::Protocol(tungstenite::error::ProtocolError::ResetWithoutClosingHandshake)) => {
+ *stream = connect_async(self.url.clone()).await?.0;
+ self.await_channels();
+ return Ok(());
+ }
+ _ => msg?,
+ };
+
+ self.process_message(msg, stream).await;
+ });
+
+ Ok(())
+ }
+
+ pub fn join_channel(&mut self, twitch_id: String) {
+ if self.awaiting_channels.contains(&twitch_id) || self.joined_channels.contains(&twitch_id)
+ {
+ return;
+ }
+
+ self.awaiting_channels.insert(twitch_id);
+ }
+
+ async fn process_message(
+ &mut self,
+ msg: Message,
+ stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
+ ) {
+ match msg {
+ Message::Text(text) => {
+ let text = text.to_string();
+
+ let json: serde_json::Value =
+ serde_json::from_str(&text).expect("Error parsing JSON payload");
+
+ let operation_code = json["op"].as_i64().expect("No op code");
+
+ // unsupported operation
+ if operation_code == 1 {
+ self.join_channels(stream).await;
+ self.identified = true;
+ return;
+ } else if operation_code != 0 {
+ return;
+ }
+
+ let event_data = &json["d"];
+
+ if event_data["type"]
+ .as_str()
+ .expect("No d.type")
+ .ne("emote_set.update")
+ {
+ return;
+ }
+
+ let event_data = &event_data["body"];
+
+ let channel = event_data["id"].as_str().expect("No body.id").to_string();
+
+ let actor_data = &event_data["actor"];
+
+ let author = Some(
+ actor_data["id"]
+ .as_str()
+ .expect("No body.actor.id")
+ .to_string(),
+ );
+
+ if let (Some(pushed), Some(func)) =
+ (event_data["pushed"].as_array(), &self.on_emote_create)
+ {
+ for emote in pushed {
+ let emote = &emote["value"];
+ (func)(channel.clone(), author.clone(), self.create_emote(emote));
+ }
+ }
+
+ if let (Some(pulled), Some(func)) =
+ (event_data["pulled"].as_array(), &self.on_emote_delete)
+ {
+ for emote in pulled {
+ let emote = &emote["old_value"];
+ (func)(channel.clone(), author.clone(), self.create_emote(emote));
+ }
+ }
+
+ if let (Some(updated), Some(func)) =
+ (event_data["updated"].as_array(), &self.on_emote_update)
+ {
+ for emote in updated {
+ let old_emote = &emote["old_value"];
+ let emote = &emote["value"];
+
+ let id = old_emote["id"]
+ .as_str()
+ .expect("No old_value.id")
+ .to_string();
+
+ let code = emote["name"].as_str().expect("No value.name").to_string();
+
+ let original_code = old_emote["name"]
+ .as_str()
+ .expect("No old_value.name")
+ .to_string();
+
+ let emote = Emote {
+ id,
+ original_code: if code.ne(&original_code) {
+ Some(original_code)
+ } else {
+ None
+ },
+ code,
+ };
+
+ (func)(channel.clone(), author.clone(), emote);
+ }
+ }
+ }
+ _ => {}
+ }
+ }
+
+ fn create_emote(&self, value: &Value) -> Emote {
+ let id = value["id"].as_str().expect("No value.id").to_string();
+
+ let code = value["name"].as_str().expect("No value.name").to_string();
+
+ let original_code = value["data"]["name"]
+ .as_str()
+ .expect("No value.data.name")
+ .to_string();
+
+ Emote {
+ id,
+ original_code: if code.ne(&original_code) {
+ Some(original_code)
+ } else {
+ None
+ },
+ code,
+ }
+ }
+
+ async fn join_channels(&mut self, stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) {
+ for id in &self.awaiting_channels {
+ let json = serde_json::json!({
+ "op": 35,
+ "d": {
+ "type": "emote_set.update",
+ "condition": {
+ "object_id": id
+ }
+ }
+ });
+
+ stream
+ .send(Message::Text(
+ serde_json::to_string(&json)
+ .expect("Error converting JSON to String")
+ .into(),
+ ))
+ .await
+ .expect("Error sending join request");
+
+ self.joined_channels.insert(id.clone());
+ }
+
+ self.awaiting_channels.clear();
+ }
+
+ fn await_channels(&mut self) {
+ let c = self.joined_channels.clone();
+ self.awaiting_channels.extend(c);
+ self.joined_channels.clear();
+ }
+}
+
+impl RetrieveEmoteWS<Emote> for SevenTVWSClient {
+ fn on_emote_create(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_create = Some(Box::new(func));
+ }
+
+ fn on_emote_update(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_update = Some(Box::new(func));
+ }
+
+ fn on_emote_delete(
+ &mut self,
+ func: &'static (dyn Fn(String, Option<String>, Emote) + Send + Sync),
+ ) {
+ self.on_emote_delete = Some(Box::new(func));
+ }
+}