Skip to content

Instantly share code, notes, and snippets.

@dong-zeyu
Last active June 15, 2023 17:36
Show Gist options
  • Save dong-zeyu/1f38a20b7b84b9775447aca1c6dd43a3 to your computer and use it in GitHub Desktop.
Save dong-zeyu/1f38a20b7b84b9775447aca1c6dd43a3 to your computer and use it in GitHub Desktop.
import json
import os
from telegram import Update
from telegram.ext import CallbackContext, CommandHandler, MessageHandler, Updater, Filters
class BroadcastBot:
def __init__(self, token, data_file, admin_password, access_password=None):
self.updater = Updater(token)
self.updater.dispatcher.add_handler(CommandHandler("start", self._start))
self.updater.dispatcher.add_handler(CommandHandler("stop", self._stop))
self.updater.dispatcher.add_handler(CommandHandler("read", self._read))
self.updater.dispatcher.add_handler(CommandHandler("filter", self._filter))
self.updater.dispatcher.add_handler(CommandHandler("admin", self._admin))
self.updater.dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, self._dump))
self.updater.dispatcher.add_error_handler(lambda update, context: None)
self.data_file = data_file
self.admin_password = admin_password
self.auth = False
if access_password is not None:
self.auth = True
self.pending = set()
self.access_password = access_password
self.load()
def load(self):
if os.path.isfile(self.data_file):
with open(self.data_file, 'r') as f:
data = json.load(f)
self.admins = data["admins"]
self.subscriber = {int(k): v for k, v in data["subscriber"].items()}
else:
self.admins = []
self.subscriber = {}
def save(self):
with open(self.data_file, 'w') as f:
json.dump({"subscriber": self.subscriber, "admins": self.admins}, f, ensure_ascii=False)
def remove_job_if_exists(self, name) -> bool:
current_jobs = self.updater.dispatcher.job_queue.get_jobs_by_name(name)
if not current_jobs:
return False
for job in current_jobs:
job.schedule_removal()
return True
def add_subscriber(self, update: Update):
chat_id = update.message.chat_id
self.subscriber[chat_id] = []
self.save()
self.send_admin(f"New Subscriber {chat_id}")
update.message.reply_text('Subscribed messages! Use /stop to unsubscribe.')
def _start(self, update: Update, context: CallbackContext) -> None:
chat_id = update.message.chat_id
if chat_id not in self.subscriber:
if not self.auth:
self.add_subscriber(update)
else:
self.pending.add(chat_id)
update.message.reply_text('Please enter the access code:')
else:
update.message.reply_text('Already Subscribed! Use /stop to unsubscribe.')
def _stop(self, update: Update, context: CallbackContext) -> None:
if update.message.chat_id in self.subscriber:
del self.subscriber[update.message.chat_id]
self.save()
self.remove_job_if_exists(f"alert_{update.message.chat_id}")
update.message.reply_text('Unsubscribed. Use /start to subscribe again.')
else:
if self.auth and update.message.chat_id in self.pending:
self.pending.remove(update.message.chat_id)
update.message.reply_text('Not subscribing. Use /start to subscribe.')
def _read(self, update: Update, context: CallbackContext) -> None:
chat_id = update.message.chat_id
self.remove_job_if_exists(f"alert_{chat_id}")
self.updater.bot.send_message(chat_id=chat_id, text="Stopped alerting message.")
def _filter(self, update: Update, context: CallbackContext) -> None:
chat_id = update.message.chat_id
if chat_id not in self.subscriber:
update.message.reply_text('Not subscribing. Use /start to subscribe.')
return
if len(context.args) == 0:
self.subscriber[chat_id] = []
update.message.reply_text('Cleared filter list.')
else:
self.subscriber[chat_id].extend(context.args)
self.subscriber[chat_id] = list(set(self.subscriber[chat_id]))
update.message.reply_text(f"Filter List: {'|'.join(self.subscriber[chat_id])}")
self.save()
def _dump(self, update: Update, context: CallbackContext) -> None:
chat_id = update.message.chat_id
text = update.message.text
if self.auth and chat_id in self.pending:
if text == self.access_password:
self.add_subscriber(update)
else:
update.message.reply_text('Wrong access code! This incident will be reported.')
self.send_admin(f"[{chat_id}]: Wrong access code {text}")
self.pending.remove(chat_id)
return
self.send_admin(f"[{chat_id}]: {text}")
def _admin(self, update: Update, context: CallbackContext) -> None:
chat_id = update.message.chat_id
if len(context.args) == 0:
if chat_id in self.admins:
update.message.reply_text("Revoke admin access!")
self.admins.remove(chat_id)
self.save()
elif len(context.args) == 1 and context.args[0] == self.admin_password:
if chat_id in self.admins:
update.message.reply_text("Already have the admin access!")
else:
update.message.reply_text("Grant admin access!")
self.send_admin(f"Grant admin for {chat_id}")
self.admins.append(chat_id)
self.save()
def send_admin(self, text, *args, **kwargs):
self.broadcast_msg(text, chat_ids=self.admins)
def broadcast_msg(self, text, chat_ids=None, *args, **kwargs):
if chat_ids is not None:
sending_list = chat_ids
else:
sending_list = self.subscriber.keys()
for chat_id in sending_list:
self.updater.bot.send_message(chat_id=chat_id, text=text, *args, **kwargs)
def alert(self, text, chat_ids=None, *args, **kwargs):
def alert_wrapper(context):
context = context.job.context
self.updater.bot.send_message(chat_id=context["id"], text=context["text"], *args, **kwargs)
sending_list = self.subscriber
if chat_ids:
sending_list = chat_ids
for chat_id in sending_list:
self.updater.bot.send_message(chat_id=chat_id, text="Use /read to stop alert message.")
self.updater.bot.send_message(chat_id=chat_id, text=text)
self.updater.dispatcher.job_queue.run_repeating(alert_wrapper, interval=5, context={"id": chat_id, "text": text}, name=f"alert_{chat_id}")
def filter_text(self, text, ignore_case=True):
if ignore_case:
text = text.lower()
return [
chat_id
for chat_id, kw_list in self.subscriber.items()
if len(kw_list) == 0 or any([(kw.lower() if ignore_case else kw) in text for kw in kw_list])
]
def stop(self):
self.save()
self.updater.stop()
def start(self):
self.updater.start_polling()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment