Skip to content

Instantly share code, notes, and snippets.

@ammarfaizi2
Created July 25, 2021 18:07
Show Gist options
  • Save ammarfaizi2/5ed55f5dbe66c297839d176257b4041a to your computer and use it in GitHub Desktop.
Save ammarfaizi2/5ed55f5dbe66c297839d176257b4041a to your computer and use it in GitHub Desktop.
// SPDX-License-Identifier: GPL-2.0
/*
* @author Ammar Faizi <ammarfaizi2@gmail.com> https://www.facebook.com/ammarfaizi2
* @license GPL-2.0
* @package tgvisd::Main::Preload
*
* Copyright (C) 2021 Ammar Faizi <ammarfaizi2@gmail.com>
*/
#if defined(__linux__)
# include <pthread.h>
#endif
#include <stack>
#include <cstdio>
#include <mutex>
#include <chrono>
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <atomic>
#include <unistd.h>
#include <unordered_map>
#include <condition_variable>
#include <errno.h>
#include <cstring>
#include <md5_php.h>
#include <sha1_php.h>
#include <tgvisd/DB.hpp>
#include "HistoryScraper.hpp"
namespace tgvisd::Modules::Preload {
using tgvisd::DBt::PreparedStatement;
using namespace std::chrono_literals;
HistoryScraper::HistoryScraper(void)
{
}
class Worker
{
public:
inline ~Worker(void)
{
if (db_)
delete db_;
}
Worker(tgvisd::Main::Main *main);
void run(void);
inline bool stopUpdate(void)
{
return main_->stopUpdate();
}
inline tgvisd::Td::Td *getTd(void)
{
return td_;
}
void gatherMsg(void);
void gatherMsgByChatId(int64_t chat_id);
td_api::object_ptr<td_api::user> getUser(int64_t user_id);
td_api::object_ptr<td_api::chat> getChat(int64_t chat_id);
td_api::object_ptr<td_api::chats> getChats(void);
td_api::object_ptr<td_api::messages> getChatHistory(int64_t chat_id);
td_api::object_ptr<td_api::file> downloadFile(int32_t file_id);
void gatherChatEventLog(int64_t chat_id);
int64_t processChatEventLog(td_api::object_ptr<td_api::chatEvents> events);
void processMessage(td_api::message &msg, bool is_edited, bool is_deleted);
uint64_t resolveUser(td_api::user &user, td_api::user **user_p);
int64_t resolveGroup(td_api::chat &chat, td_api::chat **chat_p);
uint64_t resolveMessage(td_api::message &msg, uint64_t db_user_id,
uint64_t db_group_id, bool is_edited,
bool is_deleted);
uint64_t resolveFile(tgvisd::DB *db, td_api::file &file);
uint64_t lastInsertId(void);
private:
tgvisd::DB *db_ = nullptr;
tgvisd::Td::Td *td_ = nullptr;
tgvisd::Main::Main *main_ = nullptr;
std::unordered_map<std::int64_t, std::uint64_t> userTgToDb_;
std::unordered_map<std::int64_t, std::uint64_t> groupTgToDb_;
std::unordered_map<std::uint64_t, td_api::user> userMap_;
std::unordered_map<std::int64_t, td_api::chat> groupMap_;
void saveForwardInfo(uint64_t db_msg_id, td_api::messageForwardInfo &fwd);
void insertMsgData(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataText(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataPhoto(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataSticker(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataVideo(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataAnimation(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataDocument(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgDataAudio(uint64_t db_msg_id, td_api::message &msg,
bool is_edited);
void insertMsgData__file(uint64_t db_msg_id, td_api::message &msg,
const char *text, const char *text_entities,
td_api::file &file, bool is_edited);
bool trackEventId(int64_t event_id);
uint64_t getDBMsgIdIfExists(uint64_t tg_msg_id, uint64_t chat_id,
bool *has_edited_msg);
void setIsDeletedToTrue(uint64_t db_msg_id, uint64_t db_group_id);
void setHasEditedToTrue(uint64_t db_msg_id, uint64_t db_group_id);
};
mbp_ret_t HistoryScraper::initPreload(tgvisd::Main::Main *main)
{
main->incRef();
std::thread worker = std::thread([main](void){
Worker *wrk = new Worker(main);
wrk->run();
delete wrk;
main->decRef();
});
#if defined(__linux__)
{
char trname[32];
pthread_t pt = worker.native_handle();
snprintf(trname, sizeof(trname), "history-scraper");
pthread_setname_np(pt, trname);
}
#endif
worker.detach();
return MBP_CONTINUE;
}
Worker::Worker(tgvisd::Main::Main *main):
main_(main)
{
td_ = main->getTd();
assert(td_);
}
td_api::object_ptr<td_api::chats> Worker::getChats(void)
{
return td_->send_query_sync<td_api::getChats, td_api::chats>(
td_api::make_object<td_api::getChats>(
nullptr,
std::numeric_limits<std::int64_t>::max(),
0,
100
)
);
}
td_api::object_ptr<td_api::messages> Worker::getChatHistory(int64_t chat_id)
{
int64_t from_message_id = 0;
int32_t offset = 0;
int32_t limit = 100;
bool only_local = false;
return td_->send_query_sync<td_api::getChatHistory, td_api::messages>(
td_api::make_object<td_api::getChatHistory>(
chat_id,
from_message_id,
offset,
limit,
only_local
)
);
}
td_api::object_ptr<td_api::chat> Worker::getChat(int64_t chat_id)
{
return td_->send_query_sync<td_api::getChat, td_api::chat>(
td_api::make_object<td_api::getChat>(chat_id),
30
);
}
td_api::object_ptr<td_api::user> Worker::getUser(int64_t user_id)
{
printf("In getUser(): %ld\n", user_id);
return td_->send_query_sync<td_api::getUser, td_api::user>(
td_api::make_object<td_api::getUser>(user_id)
, 30
);
}
void Worker::gatherMsg(void)
{
td_api::object_ptr<td_api::chats> chats = getChats();
if (unlikely(this->stopUpdate())) {
pr_debug("ret");
return;
}
if (!chats) {
std::cout << "Chat is null on gatherMsg" << std::endl;
return;
}
for (auto chat_id: chats->chat_ids_) {
if (chat_id > 0)
continue;
pr_debug("Gathering from: %ld", chat_id);
gatherMsgByChatId(chat_id);
}
sleep(1);
}
void Worker::gatherMsgByChatId(int64_t chat_id)
{
td_api::object_ptr<td_api::messages> msgs = getChatHistory(chat_id);
if (unlikely(this->stopUpdate()))
return;
if (!msgs) {
std::cout
<< "Messages is null on gatherMsgByChatId"
<< std::endl;
return;
}
if (msgs->total_count_ < 5)
return;
td_api::object_ptr<td_api::chat> chat = getChat(chat_id);
for (auto &msg: msgs->messages_) {
if (unlikely(!msg))
continue;
auto &content = msg->content_;
if (content->get_id() != td_api::messageText::ID)
continue;
if (chat->title_ != "GNU/Weeb")
continue;
std::string &text = static_cast<td_api::messageText &>(*content).text_->text_;
printf("[chat_id: %ld] [title: %s] [msg_id: %ld]: \"%s\"\n",
chat->id_,
chat->title_.c_str(),
(msg->id_ >> 20u),
text.c_str());
}
}
td_api::object_ptr<td_api::file> Worker::downloadFile(int32_t file_id)
{
int32_t prio = 32;
int32_t limit = 0;
int32_t offset = 0;
bool sync = true;
return td_->send_query_sync<td_api::downloadFile, td_api::file>(
td_api::make_object<td_api::downloadFile>(
file_id,
prio,
limit,
offset,
sync
),
30
);
}
const char *dbhost, *dbuser, *dbpass, *dbname;
static inline tgvisd::DB *createDB(void)
{
tgvisd::DB *db_ = new tgvisd::DB(dbhost, 0, dbuser, dbpass, dbname);
db_->connect();
return db_;
}
void Worker::run(void)
{
dbhost = getenv("DB_HOST");
if (!dbhost) {
puts("Missing DB_HOST");
abort();
}
dbuser = getenv("DB_USER");
if (!dbuser) {
puts("Missing DB_USER");
abort();
}
dbpass = getenv("DB_PASS");
if (!dbpass) {
puts("Missing DB_PASS");
abort();
}
dbname = getenv("DB_NAME");
if (!dbname) {
puts("Missing DB_NAME");
abort();
}
sleep(2);
// db_ = createDB();
gatherMsg();
// try {
//
// getChats();
// sleep(2);
// gatherChatEventLog(-1001483770714); // GNU/Weeb
// gatherChatEventLog(-1001226735471); // Private Cloud
// } catch (std::string &err) {
// std::cout << "Error: " << err << std::endl;
// throw err;
// }
}
void Worker::gatherChatEventLog(int64_t chat_id)
{
int64_t min_ev = 0;
std::vector<int32_t> user_ids;
printf("In gatherChatEventLog\n");
// auto st = db_->prepare("SELECT MIN(id) FROM track_event_id;");
// st->execute();
// if (auto row = st->fetch()) {
// uint64_t t_min = 0;
// IS_OK(mysqlx_get_uint(row, 0, &t_min), st->getStmt());
// min_ev = t_min;
// pr_debug("min_ev from DB: %ld", min_ev);
// }
while (min_ev != -1) {
auto events = td_->send_query_sync<
td_api::getChatEventLog,
td_api::chatEvents
>(
td_api::make_object<td_api::getChatEventLog>(
chat_id,
"",
min_ev,
100,
nullptr,
std::move(user_ids)
),
30
);
if (!events)
break;
min_ev = processChatEventLog(std::move(events));
pr_debug("min_ev = %ld", min_ev);
}
}
bool Worker::trackEventId(int64_t event_id)
{
try {
auto st = db_->prepare("INSERT INTO track_event_id VALUES (?)");
st->bind(
PARAM_UINT(event_id),
PARAM_END
);
st->execute();
} catch (std::string &err) {
pr_debug("skip dup: %ld", event_id);
return false;
}
return true;
}
int64_t Worker::processChatEventLog(td_api::object_ptr<td_api::chatEvents> events)
{
printf("In processChatEventLog\n");
int64_t min_event = -1;
size_t i = 0;
for (auto &ev: events->events_) {
int64_t obj_id;
if (i == 0) {
min_event = ev->id_;
} else if (ev->id_ < min_event) {
min_event = ev->id_;
}
i++;
obj_id = ev->action_->get_id();
if (
(obj_id != td_api::chatEventMessageDeleted::ID) &&
(obj_id != td_api::chatEventMessageEdited::ID)
)
continue;
db_->prepare("START TRANSACTION")->execute();
if (!trackEventId(ev->id_)) {
db_->prepare("ROLLBACK")->execute();
continue;
}
switch (obj_id) {
case td_api::chatEventMessageDeleted::ID: {
bool is_edited = false;
bool is_deleted = false;
auto &msg = static_cast<td_api::chatEventMessageDeleted &>(*ev->action_).message_;
processMessage(*msg, is_edited, is_deleted);
break;
}
case td_api::chatEventMessageEdited::ID: {
bool is_edited;
bool is_deleted = false;
auto &q = static_cast<td_api::chatEventMessageEdited &>(*ev->action_);
is_edited = false;
processMessage(*q.old_message_, is_edited, is_deleted);
is_edited = true;
processMessage(*q.new_message_, is_edited, is_deleted);
break;
}
}
db_->prepare("COMMIT")->execute();
}
return min_event;
}
void Worker::processMessage(td_api::message &msg, bool is_edited, bool is_deleted)
{
printf("In processMessage\n");
if (msg.sender_->get_id() != td_api::messageSenderUser::ID)
return;
auto user_ = getUser(
static_cast<td_api::messageSenderUser &>(*msg.sender_).user_id_
);
td_api::user *user;
auto chat_ = getChat(msg.chat_id_);
td_api::chat *chat;
// std::cout << to_string(msg) << std::endl;
// std::cout << to_string(user) << std::endl;
uint64_t db_user_id = resolveUser(*user_, &user);
uint64_t db_group_id = resolveGroup(*chat_, &chat);
// printf("db_user_id = %lu\n", db_user_id);
// printf("db_group_id = %lu\n", db_group_id);
resolveMessage(msg, db_user_id, db_group_id, is_edited, is_deleted);
}
static char *getDateByUnixTM(time_t epoch, char *buffer, size_t len)
{
strftime(buffer, len, "%Y-%m-%d %H:%I:%S", gmtime(&epoch));
return buffer;
}
static char *getDateNow(char *buffer, size_t len)
{
time_t epoch_now = std::time(nullptr);
return getDateByUnixTM(epoch_now, buffer, len);
}
static uint64_t lastInsertId_static(tgvisd::DB *db_)
{
uint64_t ret = 0;
auto st = db_->prepare("SELECT LAST_INSERT_ID();");
st->execute();
if (auto row = st->fetch()) {
IS_OK(mysqlx_get_uint(row, 0, &ret), st->getStmt());
}
return ret;
}
uint64_t Worker::lastInsertId(void)
{
return lastInsertId_static(db_);
}
uint64_t Worker::resolveUser(td_api::user &user, td_api::user **user_p)
{
uint64_t ret = 0;
std::unique_ptr<PreparedStatement> st;
const char query[] =
"INSERT INTO `gw_users` " \
"(" \
"`tg_user_id`," \
"`username`," \
"`first_name`," \
"`last_name`," \
"`is_bot`," \
"`created_at`," \
"`updated_at`" \
")" \
" VALUES (?, ?, ?, ?, '0', ?, NULL);";
std::unordered_map<int64_t, uint64_t>::const_iterator got =
userTgToDb_.find(user.id_);
if (!(got == userTgToDb_.end())) {
ret = got->second;
goto out_no_move;
}
st = db_->prepare(query);
try {
char dateBuf[64];
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
st->bind(
PARAM_UINT((uint64_t)user.id_),
PARAM_STRING(user.username_.c_str()),
PARAM_STRING(user.first_name_.c_str()),
PARAM_STRING(user.last_name_.c_str()),
PARAM_STRING(dateNow),
PARAM_END
);
st->execute();
} catch (std::string &err) {
goto do_fetch;
}
if ((ret = lastInsertId()))
goto out;
do_fetch:
pr_debug("Fetching from DB (user)...");
st = db_->prepare("SELECT id FROM gw_users WHERE tg_user_id = ?");
st->bind(
PARAM_SINT((uint64_t)user.id_),
PARAM_END
);
st->execute();
if (auto row = st->fetch()) {
IS_OK(mysqlx_get_uint(row, 0, &ret), st->getStmt());
printf("fetched = %lu\n", ret);
if (ret)
goto out;
}
out_no_move:
*user_p = &user;
return ret;
out:
userTgToDb_.insert_or_assign(user.id_, ret);
userMap_.insert_or_assign(ret, std::move(user));
*user_p = &userMap_[ret];
return ret;
}
int64_t Worker::resolveGroup(td_api::chat &chat, td_api::chat **chat_p)
{
uint64_t ret = 0;
std::unique_ptr<PreparedStatement> st;
const char query[] =
"INSERT INTO `gw_groups` " \
"(" \
"`tg_group_id`," \
"`username`," \
"`name`," \
"`link`," \
"`created_at`," \
"`updated_at`" \
")" \
" VALUES (?, ?, ?, NULL, ?, NULL);";
std::unordered_map<int64_t, uint64_t>::const_iterator got =
groupTgToDb_.find(chat.id_);
if (!(got == groupTgToDb_.end())) {
ret = got->second;
goto out_no_move;
}
st = db_->prepare(query);
try {
char dateBuf[64];
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
st->bind(
PARAM_SINT((uint64_t)chat.id_),
/* TODO: Adjust the group username dynamically. */
PARAM_STRING("GNUWeeb"),
PARAM_STRING(chat.title_.c_str()),
PARAM_STRING(dateNow),
PARAM_END
);
st->execute();
} catch (std::string &err) {
goto do_fetch;
}
if ((ret = lastInsertId()))
goto out;
do_fetch:
pr_debug("Fetching from DB (group)...");
st = db_->prepare("SELECT id FROM gw_groups WHERE tg_group_id = ?");
st->bind(
PARAM_SINT((uint64_t)chat.id_),
PARAM_END
);
st->execute();
if (auto row = st->fetch()) {
IS_OK(mysqlx_get_uint(row, 0, &ret), st->getStmt());
printf("fetched = %lu\n", ret);
if (ret)
goto out;
}
out_no_move:
*chat_p = &chat;
return ret;
out:
groupTgToDb_.insert_or_assign(chat.id_, ret);
groupMap_.insert_or_assign(chat.id_, std::move(chat));
*chat_p = &groupMap_[ret];
return ret;
}
void Worker::saveForwardInfo(uint64_t db_msg_id, td_api::messageForwardInfo &fwd)
{
const char query[] =
"INSERT INTO `gw_group_message_forward` " \
"(" \
"`msg_id`," \
"`origin_type`," \
"`orig_date`," \
"`pas_at`," \
"`orig_id`," \
"`origin_text`," \
"`from_chat_id`," \
"`from_msg_id`" \
") VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
char dateBuf[64];
const char *orig_text, *orig_type, *pas;
const char *msgDate = getDateByUnixTM(fwd.date_, dateBuf, sizeof(dateBuf));
pas = fwd.public_service_announcement_type_.c_str();
switch (fwd.origin_->get_id()) {
case td_api::messageForwardOriginUser::ID: {
auto st = db_->prepare(query);
auto &o = static_cast<td_api::messageForwardOriginUser &>
(*fwd.origin_);
orig_text = "";
orig_type = "user";
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(orig_type),
PARAM_STRING(msgDate),
PARAM_STRING(pas),
PARAM_UINT((uint64_t)o.sender_user_id_),
PARAM_STRING(orig_text),
PARAM_UINT(fwd.from_chat_id_),
PARAM_UINT(fwd.from_message_id_),
PARAM_END
);
break;
}
case td_api::messageForwardOriginHiddenUser::ID: {
auto st = db_->prepare(query);
auto &o = static_cast<td_api::messageForwardOriginHiddenUser &>
(*fwd.origin_);
orig_text = o.sender_name_.c_str();
orig_type = "hidden_user";
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(orig_type),
PARAM_STRING(msgDate),
PARAM_STRING(pas),
PARAM_UINT(0),
PARAM_STRING(orig_text),
PARAM_UINT(fwd.from_chat_id_),
PARAM_UINT(fwd.from_message_id_),
PARAM_END
);
break;
}
case td_api::messageForwardOriginChannel::ID: {
auto st = db_->prepare(query);
auto &o = static_cast<td_api::messageForwardOriginChannel &>
(*fwd.origin_);
orig_text = o.author_signature_.c_str();
orig_type = "channel";
try {
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(orig_type),
PARAM_STRING(msgDate),
PARAM_STRING(pas),
PARAM_UINT(0),
PARAM_STRING(orig_text),
PARAM_UINT(fwd.from_chat_id_),
PARAM_UINT(fwd.from_message_id_),
PARAM_END
);
} catch (std::string &err) {
std::cout << err << std::endl;
throw err;
}
break;
}
case td_api::messageForwardOriginChat::ID: {
auto st = db_->prepare(query);
auto &o = static_cast<td_api::messageForwardOriginChat &>
(*fwd.origin_);
orig_text = o.author_signature_.c_str();
orig_type = "chat";
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(orig_type),
PARAM_STRING(msgDate),
PARAM_STRING(pas),
PARAM_UINT((uint64_t)o.sender_chat_id_),
PARAM_STRING(orig_text),
PARAM_UINT(fwd.from_chat_id_),
PARAM_UINT(fwd.from_message_id_),
PARAM_END
);
break;
}
}
}
void Worker::setHasEditedToTrue(uint64_t db_msg_id, uint64_t db_group_id)
{
char dateBuf[64];
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
auto st = db_->prepare(
"UPDATE gw_group_messages SET has_edited_msg = '1', " \
"updated_at = ? WHERE " \
"id = ? AND group_id = ? LIMIT 1"
);
st->execute(
PARAM_STRING(dateNow),
PARAM_UINT(db_msg_id),
PARAM_SINT(db_group_id),
PARAM_END
);
}
void Worker::setIsDeletedToTrue(uint64_t db_msg_id, uint64_t db_group_id)
{
char dateBuf[64];
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
auto st = db_->prepare(
"UPDATE gw_group_messages SET is_deleted = '1' " \
"updated_at = ? WHERE " \
"id = ? AND group_id = ? LIMIT 1"
);
st->execute(
PARAM_STRING(dateNow),
PARAM_UINT(db_msg_id),
PARAM_SINT(db_group_id),
PARAM_END
);
}
uint64_t Worker::getDBMsgIdIfExists(uint64_t tg_msg_id, uint64_t chat_id,
bool *has_edited_msg)
{
auto st = db_->prepare(
"SELECT a.id, a.has_edited_msg FROM gw_group_messages AS a " \
"INNER JOIN gw_groups AS b ON b.id = a.group_id WHERE " \
"a.tg_msg_id = ? AND b.tg_group_id = ?"
);
st->execute(
PARAM_UINT(tg_msg_id),
PARAM_SINT(chat_id),
PARAM_END
);
auto row = st->fetch();
if (row) {
uint64_t ret = 0;
IS_OK(mysqlx_get_uint(row, 0, &ret), st->getStmt());
if (has_edited_msg) {
size_t len = 5;
char buffer[5];
IS_OK(mysqlx_get_bytes(row, 1, 0, buffer, &len),
st->getStmt());
*has_edited_msg = (*buffer == '1');
}
return ret;
}
if (has_edited_msg)
*has_edited_msg = false;
return 0;
}
uint64_t Worker::resolveMessage(td_api::message &msg, uint64_t db_user_id,
uint64_t db_group_id, bool is_edited,
bool is_deleted)
{
int64_t tg_group_id;
uint64_t tg_msg_id;
uint64_t db_msg_id;
bool has_edited_msg = false;
const char query[] =
"INSERT INTO `gw_group_messages`" \
"(" \
"`group_id`," \
"`user_id`," \
"`tg_msg_id`," \
"`reply_to_tg_msg_id`," \
"`msg_type`," \
"`has_edited_msg`," \
"`is_forwarded_msg`," \
"`is_deleted`," \
"`created_at`," \
"`updated_at`" \
") VALUES (?, ?, ?, ?, ?, '0', ?, ?, ?, NULL);";
const char *msg_type = NULL;
switch (msg.content_->get_id()) {
case td_api::messageText::ID:
msg_type = "text";
break;
/* May have files */
case td_api::messagePhoto::ID:
msg_type = "photo";
break;
case td_api::messageVideo::ID:
msg_type = "video";
break;
case td_api::messageSticker::ID:
msg_type = "sticker";
break;
case td_api::messageAnimation::ID:
msg_type = "animation";
break;
case td_api::messageDocument::ID:
msg_type = "document";
break;
case td_api::messageAudio::ID:
msg_type = "audio";
break;
/* Event */
case td_api::messageChatJoinByLink::ID:
msg_type = "join";
break;
default:
pr_debug("Got default case, skipping...");
return 0;
}
tg_group_id = (int64_t)msg.chat_id_;
tg_msg_id = (uint64_t)msg.id_;
db_msg_id = getDBMsgIdIfExists(tg_msg_id, tg_group_id, &has_edited_msg);
if (db_msg_id != 0) {
if (is_edited && !has_edited_msg)
setHasEditedToTrue(db_msg_id, db_group_id);
if (!is_edited)
goto out_delete_chk;
goto insert_msg_data;
}
pr_debug("Saving msg_type: %s", msg_type);
{
char dateBuf[64];
const char *is_forwarded_msg = msg.forward_info_ ? "1" : "0";
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
auto st = db_->prepare(query);
st->execute(
PARAM_UINT(db_group_id),
PARAM_UINT(db_user_id),
PARAM_UINT(msg.id_ >> 20u),
PARAM_UINT(msg.reply_to_message_id_ >> 20u),
PARAM_STRING(msg_type),
PARAM_STRING(is_forwarded_msg),
PARAM_STRING(is_deleted ? "1" : "0"),
PARAM_STRING(dateNow),
PARAM_END
);
}
db_msg_id = lastInsertId();
if (msg.forward_info_)
saveForwardInfo(db_msg_id, *msg.forward_info_);
insert_msg_data:
insertMsgData(db_msg_id, msg, is_edited);
out_delete_chk:
if (is_deleted)
setIsDeletedToTrue(db_msg_id, db_group_id);
return 0;
}
void Worker::insertMsgData(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
switch (msg.content_->get_id()) {
case td_api::messageText::ID:
insertMsgDataText(db_msg_id, msg, is_edited);
break;
case td_api::messagePhoto::ID:
insertMsgDataPhoto(db_msg_id, msg, is_edited);
break;
case td_api::messageVideo::ID:
insertMsgDataVideo(db_msg_id, msg, is_edited);
break;
case td_api::messageSticker::ID:
insertMsgDataSticker(db_msg_id, msg, is_edited);
break;
case td_api::messageAnimation::ID:
insertMsgDataAnimation(db_msg_id, msg, is_edited);
break;
case td_api::messageDocument::ID:
insertMsgDataDocument(db_msg_id, msg, is_edited);
break;
case td_api::messageAudio::ID:
insertMsgDataAudio(db_msg_id, msg, is_edited);
break;
}
}
void Worker::insertMsgDataText(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
const char query[] =
"INSERT INTO `gw_group_message_data`" \
"(" \
"`msg_id`," \
"`text`," \
"`text_entities`," \
"`file`," \
"`is_edited`," \
"`tg_date`," \
"`created_at`" \
") VALUES (?, ?, ?, NULL, ?, ?, ?);";
auto &msgText = static_cast<td_api::messageText &>(*msg.content_);
auto st = db_->prepare(query);
char dateBuf1[64], dateBuf2[64];
const char *msgDate = getDateByUnixTM(
is_edited ? msg.edit_date_ : msg.date_, dateBuf1,
sizeof(dateBuf1)
);
const char *dateNow = getDateNow(dateBuf2, sizeof(dateBuf2));
if (msgText.text_->entities_.size() > 0) {
const char *text_entities;
std::string en = to_string(msgText.text_->entities_);
text_entities = en.c_str();
st->bind(
PARAM_UINT(db_msg_id),
PARAM_STRING(msgText.text_->text_.c_str()),
PARAM_STRING(text_entities),
PARAM_STRING(is_edited ? "1" : "0"),
PARAM_STRING(msgDate),
PARAM_STRING(dateNow),
PARAM_END
);
} else {
st->bind(
PARAM_UINT(db_msg_id),
PARAM_STRING(msgText.text_->text_.c_str()),
PARAM_NULL(),
PARAM_STRING(is_edited ? "1" : "0"),
PARAM_STRING(msgDate),
PARAM_STRING(dateNow),
PARAM_END
);
}
st->execute();
}
static const char *get_filename_ext(const char *filename)
{
const char *dot = strrchr(filename, '.');
if (!dot || dot == filename)
return "";
return dot + 1;
}
uint64_t Worker::resolveFile(tgvisd::DB *db, td_api::file &file)
{
unsigned char md5_buf[16], sha1_buf[20];
uint64_t ret = 0;
pr_debug("Downloading file...");
auto f = downloadFile(file.id_);
if (!f) {
pr_debug("Download fail?");
return ret;
}
pr_debug("Downloaded OK!");
// std::cout << to_string(f) << std::endl;
auto &loc = f->local_;
if (loc->path_ == "") {
pr_debug("Cannot find file path");
return ret;
}
if (!md5_file(loc->path_.c_str(), md5_buf)) {
pr_debug("md5_file failed: %s", strerror(errno));
return ret;
}
if (!sha1_file(loc->path_.c_str(), sha1_buf)) {
pr_debug("sha1_file failed: %s", strerror(errno));
return ret;
}
char new_fn[1024];
char md5_dg[sizeof(md5_buf) * 2 + 1] = {0};
char sha1_dg[sizeof(sha1_buf) * 2 + 1] = {0};
const char *file_ext = get_filename_ext(loc->path_.c_str());
make_digest_ex(md5_dg, md5_buf, sizeof(md5_buf));
make_digest_ex(sha1_dg, sha1_buf, sizeof(sha1_buf));
// std::cout << "File ex: " << file_ext << std::endl;
// std::cout << "md5 : " << md5_dg << std::endl;
// std::cout << "sha1: " << sha1_dg << std::endl;
// std::cout << "path: " << loc->path_.c_str() << std::endl;
snprintf(new_fn, sizeof(new_fn),
"storage/files/%s_%s.%s", md5_dg, sha1_dg, file_ext);
pr_debug("newp: %s", new_fn);
if (rename(loc->path_.c_str(), new_fn)) {
pr_debug("rename(): %s", strerror(errno));
}
if (link(new_fn, loc->path_.c_str())) {
pr_debug("link(): %s", strerror(errno));
}
try {
const char query[] =
"INSERT INTO `gw_files` " \
"(" \
"`tg_file_id`," \
"`tg_uniq_id`," \
"`md5_sum`," \
"`sha1_sum`," \
"`ext`," \
"`size`," \
"`created_at`" \
") VALUES (?, ?, ?, ?, ?, ?, ?)";
char dateBuf[64];
const char *dateNow = getDateNow(dateBuf, sizeof(dateBuf));
auto st = db->prepare(query);
st->execute(
PARAM_STRING(file.remote_->id_.c_str()),
PARAM_STRING(file.remote_->unique_id_.c_str()),
PARAM_BYTES(md5_buf, sizeof(md5_buf)),
PARAM_BYTES(sha1_buf, sizeof(sha1_buf)),
PARAM_STRING(file_ext),
PARAM_UINT((uint64_t)loc->downloaded_size_),
PARAM_STRING(dateNow),
PARAM_END
);
ret = lastInsertId_static(db);
} catch (std::string &err) {
std::cout << err << std::endl;
throw err;
}
return ret;
}
// #define MAX_WRK (100u)
// static std::mutex wrk_mut;
// static std::condition_variable wrk_cond;
// static std::atomic<uint32_t> wrk_online = 0;
// static bool init = false;
// struct db_pool {
// tgvisd::DB *db;
// uint16_t idx;
// };
// std::mutex pool_mutex;
// static std::vector<struct db_pool> wrk_db_pool;
// static std::stack<uint16_t> wrk_db_pstack;
// static void putWrkDb(struct db_pool *pl)
// {
// pool_mutex.lock();
// wrk_db_pstack.push(pl->idx);
// pool_mutex.unlock();
// }
// static struct db_pool *getWrkDb(void)
// {
// struct db_pool *ret = nullptr;
// uint16_t idx;
// pool_mutex.lock();
// if (!wrk_db_pstack.empty()) {
// idx = wrk_db_pstack.top();
// wrk_db_pstack.pop();
// if (!wrk_db_pool[idx].db) {
// wrk_db_pool[idx].db = createDB();
// }
// ret = &wrk_db_pool[idx];
// }
// pool_mutex.unlock();
// return ret;
// }
// static void initWrkDb(void)
// {
// if (!init) {
// init = true;
// wrk_db_pool.reserve(200);
// for (uint16_t i = 200; i--;) {
// struct db_pool pl;
// pl.db = nullptr;
// pl.idx = i;
// wrk_db_pool.push_back(pl);
// putWrkDb(&pl);
// }
// }
// }
// static void wrk_wait(void)
// {
// pr_debug("wrk acquiring mutex...");
// std::unique_lock<std::mutex> lock(wrk_mut);
// while (atomic_load(&wrk_online) >= MAX_WRK) {
// pr_debug("Waiting for workers to finish (count=%u)...",
// atomic_load(&wrk_online));
// wrk_cond.wait_for(lock, 1000ms, [](){
// return !(atomic_load(&wrk_online) >= MAX_WRK);
// });
// }
// pr_debug("Wrk go! (count=%u)", atomic_load(&wrk_online));
// initWrkDb();
// }
void Worker::insertMsgDataDocument(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgDoc = static_cast<td_api::messageDocument &>(*msg.content_);
const char *text = msgDoc.caption_->text_.c_str();
std::string en;
const char *text_entities = NULL;
auto &text_obj = msgDoc.caption_;
if (text_obj->entities_.size() > 0) {
en = to_string(text_obj->entities_);
text_entities = en.c_str();
}
insertMsgData__file(db_msg_id, msg, text, text_entities,
*msgDoc.document_->document_, is_edited);
}
void Worker::insertMsgDataAudio(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgAudio = static_cast<td_api::messageAudio &>(*msg.content_);
const char *text = msgAudio.caption_->text_.c_str();
std::string en;
const char *text_entities = NULL;
auto &text_obj = msgAudio.caption_;
if (text_obj->entities_.size() > 0) {
en = to_string(text_obj->entities_);
text_entities = en.c_str();
}
insertMsgData__file(db_msg_id, msg, text, text_entities,
*msgAudio.audio_->audio_, is_edited);
}
void Worker::insertMsgDataVideo(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgVideo = static_cast<td_api::messageVideo &>(*msg.content_);
const char *text = msgVideo.caption_->text_.c_str();
std::string en;
const char *text_entities = NULL;
auto &text_obj = msgVideo.caption_;
if (text_obj->entities_.size() > 0) {
en = to_string(text_obj->entities_);
text_entities = en.c_str();
}
insertMsgData__file(db_msg_id, msg, text, text_entities,
*msgVideo.video_->video_, is_edited);
}
void Worker::insertMsgDataAnimation(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgAnim = static_cast<td_api::messageAnimation &>(*msg.content_);
const char *text = msgAnim.caption_->text_.c_str();
std::string en;
const char *text_entities = NULL;
auto &text_obj = msgAnim.caption_;
if (text_obj->entities_.size() > 0) {
en = to_string(text_obj->entities_);
text_entities = en.c_str();
}
insertMsgData__file(db_msg_id, msg, text, text_entities,
*msgAnim.animation_->animation_, is_edited);
}
void Worker::insertMsgDataSticker(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgSticker = static_cast<td_api::messageSticker &>(*msg.content_);
const char *text = msgSticker.sticker_->emoji_.c_str();
insertMsgData__file(db_msg_id, msg, text, NULL,
*msgSticker.sticker_->sticker_, is_edited);
}
void Worker::insertMsgDataPhoto(uint64_t db_msg_id, td_api::message &msg,
bool is_edited)
{
auto &msgPhoto = static_cast<td_api::messagePhoto &>(*msg.content_);
const char *text = msgPhoto.caption_->text_.c_str();
td_api::file *taken_file = nullptr;
auto &photos = msgPhoto.photo_->sizes_;
for (auto &photo: photos) {
auto &pfile = photo->photo_;
if (!taken_file) {
taken_file = pfile.get();
} else {
if (taken_file->size_ < pfile->size_)
taken_file = pfile.get();
}
}
if (!taken_file)
return;
std::string en;
const char *text_entities = NULL;
auto &text_obj = msgPhoto.caption_;
if (text_obj->entities_.size() > 0) {
en = to_string(text_obj->entities_);
text_entities = en.c_str();
}
insertMsgData__file(db_msg_id, msg, text, text_entities, *taken_file,
is_edited);
}
void Worker::insertMsgData__file(uint64_t db_msg_id, td_api::message &msg,
const char *text, const char *text_entities,
td_api::file &file, bool is_edited)
{
// struct db_pool *dbp = getWrkDb();
// if (!dbp) {
// puts("DB pool is full!");
// abort();
// }
uint64_t file_id;
auto db = db_; // dbp->db;
file_id = resolveFile(db, file);
const char query[] =
"INSERT INTO `gw_group_message_data`" \
"(" \
"`msg_id`," \
"`text`," \
"`text_entities`," \
"`file`," \
"`is_edited`," \
"`tg_date`," \
"`created_at`" \
") VALUES (?, ?, ?, ?, ?, ?, ?);";
auto st = db_->prepare(query);
char dateBuf1[64], dateBuf2[64];
const char *msgDate = getDateByUnixTM(
is_edited ? msg.edit_date_ : msg.date_, dateBuf1,
sizeof(dateBuf1)
);
const char *dateNow = getDateNow(dateBuf2, sizeof(dateBuf2));
if (text_entities) {
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(text),
PARAM_STRING(text_entities),
PARAM_UINT(file_id),
PARAM_STRING(is_edited ? "1" : "0"),
PARAM_STRING(msgDate),
PARAM_STRING(dateNow),
PARAM_END
);
} else {
st->execute(
PARAM_UINT(db_msg_id),
PARAM_STRING(text),
PARAM_NULL(),
PARAM_UINT(file_id),
PARAM_STRING(is_edited ? "1" : "0"),
PARAM_STRING(msgDate),
PARAM_STRING(dateNow),
PARAM_END
);
}
// putWrkDb(dbp);
}
} /* namespace tgvisd::Modules::Preload */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment