Last active
June 15, 2023 17:36
-
-
Save dong-zeyu/1f38a20b7b84b9775447aca1c6dd43a3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
from telegram import Update | |
from telegram.ext import CallbackContext, CommandHandler, MessageHandler, Updater, Filters | |
class BroadcastBot: | |
def __init__(self, token, data_file, admin_password, access_password=None): | |
self.updater = Updater(token) | |
self.updater.dispatcher.add_handler(CommandHandler("start", self._start)) | |
self.updater.dispatcher.add_handler(CommandHandler("stop", self._stop)) | |
self.updater.dispatcher.add_handler(CommandHandler("read", self._read)) | |
self.updater.dispatcher.add_handler(CommandHandler("filter", self._filter)) | |
self.updater.dispatcher.add_handler(CommandHandler("admin", self._admin)) | |
self.updater.dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, self._dump)) | |
self.updater.dispatcher.add_error_handler(lambda update, context: None) | |
self.data_file = data_file | |
self.admin_password = admin_password | |
self.auth = False | |
if access_password is not None: | |
self.auth = True | |
self.pending = set() | |
self.access_password = access_password | |
self.load() | |
def load(self): | |
if os.path.isfile(self.data_file): | |
with open(self.data_file, 'r') as f: | |
data = json.load(f) | |
self.admins = data["admins"] | |
self.subscriber = {int(k): v for k, v in data["subscriber"].items()} | |
else: | |
self.admins = [] | |
self.subscriber = {} | |
def save(self): | |
with open(self.data_file, 'w') as f: | |
json.dump({"subscriber": self.subscriber, "admins": self.admins}, f, ensure_ascii=False) | |
def remove_job_if_exists(self, name) -> bool: | |
current_jobs = self.updater.dispatcher.job_queue.get_jobs_by_name(name) | |
if not current_jobs: | |
return False | |
for job in current_jobs: | |
job.schedule_removal() | |
return True | |
def add_subscriber(self, update: Update): | |
chat_id = update.message.chat_id | |
self.subscriber[chat_id] = [] | |
self.save() | |
self.send_admin(f"New Subscriber {chat_id}") | |
update.message.reply_text('Subscribed messages! Use /stop to unsubscribe.') | |
def _start(self, update: Update, context: CallbackContext) -> None: | |
chat_id = update.message.chat_id | |
if chat_id not in self.subscriber: | |
if not self.auth: | |
self.add_subscriber(update) | |
else: | |
self.pending.add(chat_id) | |
update.message.reply_text('Please enter the access code:') | |
else: | |
update.message.reply_text('Already Subscribed! Use /stop to unsubscribe.') | |
def _stop(self, update: Update, context: CallbackContext) -> None: | |
if update.message.chat_id in self.subscriber: | |
del self.subscriber[update.message.chat_id] | |
self.save() | |
self.remove_job_if_exists(f"alert_{update.message.chat_id}") | |
update.message.reply_text('Unsubscribed. Use /start to subscribe again.') | |
else: | |
if self.auth and update.message.chat_id in self.pending: | |
self.pending.remove(update.message.chat_id) | |
update.message.reply_text('Not subscribing. Use /start to subscribe.') | |
def _read(self, update: Update, context: CallbackContext) -> None: | |
chat_id = update.message.chat_id | |
self.remove_job_if_exists(f"alert_{chat_id}") | |
self.updater.bot.send_message(chat_id=chat_id, text="Stopped alerting message.") | |
def _filter(self, update: Update, context: CallbackContext) -> None: | |
chat_id = update.message.chat_id | |
if chat_id not in self.subscriber: | |
update.message.reply_text('Not subscribing. Use /start to subscribe.') | |
return | |
if len(context.args) == 0: | |
self.subscriber[chat_id] = [] | |
update.message.reply_text('Cleared filter list.') | |
else: | |
self.subscriber[chat_id].extend(context.args) | |
self.subscriber[chat_id] = list(set(self.subscriber[chat_id])) | |
update.message.reply_text(f"Filter List: {'|'.join(self.subscriber[chat_id])}") | |
self.save() | |
def _dump(self, update: Update, context: CallbackContext) -> None: | |
chat_id = update.message.chat_id | |
text = update.message.text | |
if self.auth and chat_id in self.pending: | |
if text == self.access_password: | |
self.add_subscriber(update) | |
else: | |
update.message.reply_text('Wrong access code! This incident will be reported.') | |
self.send_admin(f"[{chat_id}]: Wrong access code {text}") | |
self.pending.remove(chat_id) | |
return | |
self.send_admin(f"[{chat_id}]: {text}") | |
def _admin(self, update: Update, context: CallbackContext) -> None: | |
chat_id = update.message.chat_id | |
if len(context.args) == 0: | |
if chat_id in self.admins: | |
update.message.reply_text("Revoke admin access!") | |
self.admins.remove(chat_id) | |
self.save() | |
elif len(context.args) == 1 and context.args[0] == self.admin_password: | |
if chat_id in self.admins: | |
update.message.reply_text("Already have the admin access!") | |
else: | |
update.message.reply_text("Grant admin access!") | |
self.send_admin(f"Grant admin for {chat_id}") | |
self.admins.append(chat_id) | |
self.save() | |
def send_admin(self, text, *args, **kwargs): | |
self.broadcast_msg(text, chat_ids=self.admins) | |
def broadcast_msg(self, text, chat_ids=None, *args, **kwargs): | |
if chat_ids is not None: | |
sending_list = chat_ids | |
else: | |
sending_list = self.subscriber.keys() | |
for chat_id in sending_list: | |
self.updater.bot.send_message(chat_id=chat_id, text=text, *args, **kwargs) | |
def alert(self, text, chat_ids=None, *args, **kwargs): | |
def alert_wrapper(context): | |
context = context.job.context | |
self.updater.bot.send_message(chat_id=context["id"], text=context["text"], *args, **kwargs) | |
sending_list = self.subscriber | |
if chat_ids: | |
sending_list = chat_ids | |
for chat_id in sending_list: | |
self.updater.bot.send_message(chat_id=chat_id, text="Use /read to stop alert message.") | |
self.updater.bot.send_message(chat_id=chat_id, text=text) | |
self.updater.dispatcher.job_queue.run_repeating(alert_wrapper, interval=5, context={"id": chat_id, "text": text}, name=f"alert_{chat_id}") | |
def filter_text(self, text, ignore_case=True): | |
if ignore_case: | |
text = text.lower() | |
return [ | |
chat_id | |
for chat_id, kw_list in self.subscriber.items() | |
if len(kw_list) == 0 or any([(kw.lower() if ignore_case else kw) in text for kw in kw_list]) | |
] | |
def stop(self): | |
self.save() | |
self.updater.stop() | |
def start(self): | |
self.updater.start_polling() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment