Skip to content

Instantly share code, notes, and snippets.

@1ort
Last active June 5, 2022 01:46
Show Gist options
  • Save 1ort/bd42ba867afcc147d63cff7ee4c5d591 to your computer and use it in GitHub Desktop.
Save 1ort/bd42ba867afcc147d63cff7ee4c5d591 to your computer and use it in GitHub Desktop.
broadcaster.py - модуль для рассылок по базе бота
import asyncio
from aiogram.utils import exceptions
from aiogram.types.chat import Chat
from datetime import datetime
from aiogram import Bot
from string import Template
from collections.abc import Iterable
from abc import ABC, abstractmethod
async def pass_handler(*args, **kwargs):
pass
class UnsupportedException(Exception):
pass
class Broadcaster:
def __init__(self):
self.handlable_exceptions = (
exceptions.BotBlocked, exceptions.ChatNotFound, exceptions.UserDeactivated, exceptions.TelegramAPIError)
self.handlers = {
exceptions.BotBlocked: [pass_handler],
exceptions.ChatNotFound: [pass_handler],
exceptions.UserDeactivated: [pass_handler],
exceptions.TelegramAPIError: [pass_handler]
}
self.success_handlers = [pass_handler]
self.broadcasts = []
self.saved_messages = {}
def save_message(self, message, save_id=None):
if not save_id:
save_id = message.chat.id
self.saved_messages[save_id] = message
def get_saved_message(self, save_id=None):
if not save_id:
save_id = Chat.get_current().id
message = self.saved_messages.get(save_id)
if not message:
raise KeyError('No message passed and no saved message for this save_id or wrong save_id')
return message
def get_timeout(self):
return len(self.broadcasts) * 0.05
def exception_handler(self, *exception_types):
def register_handler(handler):
self.register_exception_handler(handler, *exception_types)
return handler
return register_handler
def register_exception_handler(self, handler, *exception_types):
for exception_type in exception_types:
if exception_type not in self.handlers:
raise UnsupportedException(f'Unsupported exception to handle: {exception_type}')
if exception_type in self.handlers:
self.handlers[exception_type].append(handler)
else:
self.handlers[exception_type] = [handler]
return handler
async def call_exception_handlers(self, exception_type, user_id):
if exception_type in self.handlers:
for handler in self.handlers[exception_type]:
await handler(user_id, exception_type)
# For beautiful decorator
def success_handler(self, handler):
return self.register_success_handler(handler)
# For direct call
def register_success_handler(self, handler):
self.success_handlers.append(handler)
return handler
async def call_success(self, message, results):
for handler in self.success_handlers:
await handler(message, results)
def message_broadcast(self, id_list, message=None, save_id=None, disable_notification=False, reply_markup=None):
broadcast = MessageBroadcast(self, message or self.get_saved_message(save_id), id_list, disable_notification,
reply_markup)
self.broadcasts.append(broadcast)
return broadcast
def template_broadcast(self, user_list, message=None, save_id=None, disable_notification=False, reply_markup=None):
broadcast = TemplateBroadcast(self, message or self.get_saved_message(save_id), user_list, disable_notification,
reply_markup)
self.broadcasts.append(broadcast)
return broadcast
def forward_broadcast(self, id_list, message=None, save_id=None, disable_notification=False, reply_markup=None):
broadcast = ForwardBroadcast(self, message or self.get_saved_message(save_id), id_list, disable_notification,
reply_markup)
self.broadcasts.append(broadcast)
return broadcast
class BaseBroadcast(ABC):
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup):
self.active = False
self.dispatcher = dispatcher
self.message = message
self.user_list = user_list
self.disable_notification = disable_notification
self.reply_markup = reply_markup
self.results = {
exceptions.BotBlocked: 0,
exceptions.ChatNotFound: 0,
exceptions.UserDeactivated: 0,
exceptions.TelegramAPIError: 0,
'success': 0,
'total': 0
}
async def send_message(self, user):
try:
await self._dispatch(user)
except exceptions.RetryAfter as e:
await asyncio.sleep(e.timeout)
return await self.send_message(user) # Recursive call
except self.dispatcher.handlable_exceptions as e:
if isinstance(e, Iterable):
await self.dispatcher.call_exception_handlers(e.__class__, user['user_id'])
else:
await self.dispatcher.call_exception_handlers(e.__class__, user)
self.results[e.__class__] += 1
else:
self.results['success'] += 1
return True
return False
def get_results(self):
results = {
'timedelta': str(self.finish_time - self.start_time).split('.')[0],
'blocked': self.results[exceptions.BotBlocked],
'not_found': self.results[exceptions.ChatNotFound],
'deactivated': self.results[exceptions.UserDeactivated],
'errors': self.results[exceptions.TelegramAPIError],
'success': self.results['success'],
'total': self.results['total']
}
return results
async def run(self):
self.active = True
self.start_time = datetime.now()
for user in self.user_list:
if self.active:
self.results['total'] += 1
await self.send_message(user)
await asyncio.sleep(self.dispatcher.get_timeout())
else:
break
self.dispatcher.broadcasts.remove(self)
self.finish_time = datetime.now()
await self.dispatcher.call_success(self.message, self.get_results())
async def stop(self):
self.active = False
@abstractmethod
async def _dispatch(self, user):
pass
class MessageBroadcast(BaseBroadcast):
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup):
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup)
# self._dispatch = self._copy_message
async def _dispatch(self, user_id):
await self.message.send_copy(user_id, disable_notification=self.disable_notification,
reply_markup=self.reply_markup)
class ForwardBroadcast(BaseBroadcast):
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup):
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup)
# self._dispatch = self._forward_message
async def _dispatch(self, user_id):
await self.message.forward(user_id, disable_notification=self.disable_notification)
class TextBroadcast(BaseBroadcast):
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup):
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup)
self.bot = Bot.get_current()
async def _dispatch(self, user_id):
await self.bot.send_message(user_id, self.message, disable_notification=self.disable_notification,
reply_markup=self.reply_markup)
class TemplateBroadcast(BaseBroadcast):
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup):
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup)
self.template = Template(self.message.html_text)
# self._dispatch = self._send_formatted
self.bot = Bot.get_current()
async def _dispatch(self, user_dict):
message_text = self.template.safe_substitute(**user_dict)
await self._send_with_caption(user_dict.get('user_id'), message_text)
async def _send_with_caption(self, user_id, text):
message = self.message
kwargs = {
'chat_id': user_id,
'disable_notification': self.disable_notification,
'reply_markup': self.reply_markup
}
if message.text:
await self.bot.send_message(text=text, **kwargs)
return
elif message.audio:
await self.bot.send_audio(audio=message.audio.file_id,
caption=text,
title=message.audio.title,
performer=message.audio.performer,
duration=message.audio.duration,
**kwargs
)
return
elif message.animation:
await self.bot.send_animation(
animation=message.animation.file_id, caption=text, **kwargs
)
return
elif message.document:
await self.bot.send_document(
document=message.document.file_id, caption=text, **kwargs
)
return
elif message.photo:
await self.bot.send_photo(
photo=message.photo[-1].file_id, caption=text, **kwargs
)
return
else:
await message.copy_to(**kwargs)
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment