summaryrefslogtreecommitdiff
path: root/bot/src/stream.cpp
diff options
context:
space:
mode:
authorilotterytea <iltsu@alright.party>2024-05-18 14:48:12 +0500
committerilotterytea <iltsu@alright.party>2024-05-18 14:48:12 +0500
commitd1793df1eda463b10107d41785ad1d7f055ed476 (patch)
treefd3e41c3b4a05924748ae4b762e1ae55a0bc815c /bot/src/stream.cpp
parentd7a2de17e9b7931f68b5b4079b1c36866a19d343 (diff)
upd: moved the bot part to a relative subfolder
Diffstat (limited to 'bot/src/stream.cpp')
-rw-r--r--bot/src/stream.cpp200
1 files changed, 200 insertions, 0 deletions
diff --git a/bot/src/stream.cpp b/bot/src/stream.cpp
new file mode 100644
index 0000000..6e48fb8
--- /dev/null
+++ b/bot/src/stream.cpp
@@ -0,0 +1,200 @@
+#include "stream.hpp"
+
+#include <algorithm>
+#include <chrono>
+#include <pqxx/pqxx>
+#include <set>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "api/twitch/schemas/stream.hpp"
+#include "config.hpp"
+#include "logger.hpp"
+#include "schemas/stream.hpp"
+#include "utils/string.hpp"
+
+namespace bot::stream {
+ void StreamListenerClient::listen_channel(const int &id) {
+ this->ids.push_back(id);
+ }
+ void StreamListenerClient::unlisten_channel(const int &id) {
+ auto x = std::find_if(this->ids.begin(), this->ids.end(),
+ [&](const auto &x) { return x == id; });
+
+ if (x != this->ids.end()) {
+ this->ids.erase(x);
+ }
+
+ auto y = std::find_if(this->online_ids.begin(), this->online_ids.end(),
+ [&](const auto &x) { return x == id; });
+
+ if (y != this->online_ids.end()) {
+ this->online_ids.erase(y);
+ }
+ }
+ void StreamListenerClient::run() {
+ while (true) {
+ this->update_channel_ids();
+ this->check();
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ }
+ void StreamListenerClient::check() {
+ auto streams = this->helix_client.get_streams(this->ids);
+ auto now = std::chrono::system_clock::now();
+ auto now_time_it = std::chrono::system_clock::to_time_t(now);
+ auto now_tm = std::gmtime(&now_time_it);
+ now = std::chrono::system_clock::from_time_t(std::mktime(now_tm));
+
+ // adding new ids
+ for (const auto &stream : streams) {
+ bool is_already_live =
+ std::any_of(this->online_ids.begin(), this->online_ids.end(),
+ [&](const auto &x) { return x == stream.get_user_id(); });
+
+ if (!is_already_live) {
+ this->online_ids.insert(stream.get_user_id());
+
+ auto difference = now - stream.get_started_at();
+ auto difference_min =
+ std::chrono::duration_cast<std::chrono::minutes>(difference);
+
+ if (difference_min.count() < 1) {
+ this->handler(schemas::EventType::LIVE, stream);
+ }
+ }
+ }
+
+ // removing old ids
+ for (auto i = this->online_ids.begin(); i != this->online_ids.end();) {
+ auto stream =
+ std::find_if(streams.begin(), streams.end(),
+ [&](const auto &x) { return x.get_user_id() == *i; });
+
+ if (stream == streams.end()) {
+ this->handler(schemas::EventType::OFFLINE,
+ api::twitch::schemas::Stream{*i});
+ i = this->online_ids.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ }
+ void StreamListenerClient::handler(
+ const schemas::EventType &type,
+ const api::twitch::schemas::Stream &stream) {
+ pqxx::connection conn(GET_DATABASE_CONNECTION_URL(this->configuration));
+ pqxx::work work(conn);
+
+ pqxx::result events = work.exec(
+ "SELECT id, channel_id, message, flags FROM events WHERE event_type "
+ "= " +
+ std::to_string(type) +
+ " AND target_alias_id = " + std::to_string(stream.get_user_id()));
+
+ for (const auto &event : events) {
+ pqxx::row channel = work.exec1(
+ "SELECT alias_id, alias_name, opted_out_at FROM channels WHERE id "
+ "= " +
+ std::to_string(event[1].as<int>()));
+
+ if (!channel[2].is_null()) {
+ continue;
+ }
+
+ pqxx::result subs = work.exec(
+ "SELECT user_id FROM event_subscriptions WHERE event_id = " +
+ std::to_string(event[0].as<int>()));
+
+ std::set<std::string> user_ids;
+ if (!subs.empty()) {
+ for (const auto &sub : subs) {
+ user_ids.insert(std::to_string(sub[0].as<int>()));
+ }
+
+ pqxx::result users = work.exec(
+ "SELECT alias_name FROM users WHERE id IN (" +
+ utils::string::str(user_ids.begin(), user_ids.end(), ',') + ")");
+
+ user_ids.clear();
+
+ for (const auto &user : users) {
+ user_ids.insert(user[0].as<std::string>());
+ }
+ }
+
+ auto flags = event[3].as_array();
+ std::pair<pqxx::array_parser::juncture, std::string> elem;
+
+ do {
+ elem = flags.get_next();
+ if (elem.first == pqxx::array_parser::juncture::string_value) {
+ if (std::stoi(elem.second) == schemas::EventFlag::MASSPING) {
+ auto chatters = this->helix_client.get_chatters(
+ channel[0].as<int>(), this->irc_client.get_bot_id());
+
+ for (const auto &chatter : chatters) {
+ user_ids.insert(chatter.login);
+ }
+ }
+ }
+ } while (elem.first != pqxx::array_parser::juncture::done);
+
+ std::string base = "⚡ " + event[2].as<std::string>();
+ std::vector<std::string> msgs = {""};
+ int index = 0;
+
+ if (!user_ids.empty()) {
+ base.append(" · ");
+ }
+
+ for (const auto &user_id : user_ids) {
+ const std::string &current_msg = msgs.at(index);
+ std::string x = "@" + user_id;
+
+ if (base.length() + current_msg.length() + 1 + x.length() >= 500) {
+ index += 1;
+ }
+
+ if (index > msgs.size() - 1) {
+ msgs.push_back(x);
+ } else {
+ msgs[index] = current_msg + " " + x;
+ }
+ }
+
+ for (const auto &msg : msgs) {
+ this->irc_client.say(channel[1].as<std::string>(), base + msg);
+ }
+ }
+
+ work.commit();
+ conn.close();
+ }
+ void StreamListenerClient::update_channel_ids() {
+ pqxx::connection conn(GET_DATABASE_CONNECTION_URL(this->configuration));
+ pqxx::work work(conn);
+
+ pqxx::result ids =
+ work.exec("SELECT target_alias_id FROM events WHERE event_type < 99");
+
+ for (const auto &row : ids) {
+ int id = row[0].as<int>();
+
+ if (std::any_of(this->ids.begin(), this->ids.end(),
+ [&](const auto &x) { return x == id; })) {
+ continue;
+ }
+
+ log::info("TwitchStreamListener",
+ "Listening stream events for ID " + std::to_string(id));
+
+ this->ids.push_back(id);
+ }
+
+ work.commit();
+ conn.close();
+ }
+}