Last active
June 5, 2022 01:46
-
-
Save 1ort/bd42ba867afcc147d63cff7ee4c5d591 to your computer and use it in GitHub Desktop.
broadcaster.py - модуль для рассылок по базе бота
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from aiogram.utils import exceptions | |
from aiogram.types.chat import Chat | |
from datetime import datetime | |
from aiogram import Bot | |
from string import Template | |
from collections.abc import Iterable | |
from abc import ABC, abstractmethod | |
async def pass_handler(*args, **kwargs): | |
pass | |
class UnsupportedException(Exception): | |
pass | |
class Broadcaster: | |
def __init__(self): | |
self.handlable_exceptions = ( | |
exceptions.BotBlocked, exceptions.ChatNotFound, exceptions.UserDeactivated, exceptions.TelegramAPIError) | |
self.handlers = { | |
exceptions.BotBlocked: [pass_handler], | |
exceptions.ChatNotFound: [pass_handler], | |
exceptions.UserDeactivated: [pass_handler], | |
exceptions.TelegramAPIError: [pass_handler] | |
} | |
self.success_handlers = [pass_handler] | |
self.broadcasts = [] | |
self.saved_messages = {} | |
def save_message(self, message, save_id=None): | |
if not save_id: | |
save_id = message.chat.id | |
self.saved_messages[save_id] = message | |
def get_saved_message(self, save_id=None): | |
if not save_id: | |
save_id = Chat.get_current().id | |
message = self.saved_messages.get(save_id) | |
if not message: | |
raise KeyError('No message passed and no saved message for this save_id or wrong save_id') | |
return message | |
def get_timeout(self): | |
return len(self.broadcasts) * 0.05 | |
def exception_handler(self, *exception_types): | |
def register_handler(handler): | |
self.register_exception_handler(handler, *exception_types) | |
return handler | |
return register_handler | |
def register_exception_handler(self, handler, *exception_types): | |
for exception_type in exception_types: | |
if exception_type not in self.handlers: | |
raise UnsupportedException(f'Unsupported exception to handle: {exception_type}') | |
if exception_type in self.handlers: | |
self.handlers[exception_type].append(handler) | |
else: | |
self.handlers[exception_type] = [handler] | |
return handler | |
async def call_exception_handlers(self, exception_type, user_id): | |
if exception_type in self.handlers: | |
for handler in self.handlers[exception_type]: | |
await handler(user_id, exception_type) | |
# For beautiful decorator | |
def success_handler(self, handler): | |
return self.register_success_handler(handler) | |
# For direct call | |
def register_success_handler(self, handler): | |
self.success_handlers.append(handler) | |
return handler | |
async def call_success(self, message, results): | |
for handler in self.success_handlers: | |
await handler(message, results) | |
def message_broadcast(self, id_list, message=None, save_id=None, disable_notification=False, reply_markup=None): | |
broadcast = MessageBroadcast(self, message or self.get_saved_message(save_id), id_list, disable_notification, | |
reply_markup) | |
self.broadcasts.append(broadcast) | |
return broadcast | |
def template_broadcast(self, user_list, message=None, save_id=None, disable_notification=False, reply_markup=None): | |
broadcast = TemplateBroadcast(self, message or self.get_saved_message(save_id), user_list, disable_notification, | |
reply_markup) | |
self.broadcasts.append(broadcast) | |
return broadcast | |
def forward_broadcast(self, id_list, message=None, save_id=None, disable_notification=False, reply_markup=None): | |
broadcast = ForwardBroadcast(self, message or self.get_saved_message(save_id), id_list, disable_notification, | |
reply_markup) | |
self.broadcasts.append(broadcast) | |
return broadcast | |
class BaseBroadcast(ABC): | |
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup): | |
self.active = False | |
self.dispatcher = dispatcher | |
self.message = message | |
self.user_list = user_list | |
self.disable_notification = disable_notification | |
self.reply_markup = reply_markup | |
self.results = { | |
exceptions.BotBlocked: 0, | |
exceptions.ChatNotFound: 0, | |
exceptions.UserDeactivated: 0, | |
exceptions.TelegramAPIError: 0, | |
'success': 0, | |
'total': 0 | |
} | |
async def send_message(self, user): | |
try: | |
await self._dispatch(user) | |
except exceptions.RetryAfter as e: | |
await asyncio.sleep(e.timeout) | |
return await self.send_message(user) # Recursive call | |
except self.dispatcher.handlable_exceptions as e: | |
if isinstance(e, Iterable): | |
await self.dispatcher.call_exception_handlers(e.__class__, user['user_id']) | |
else: | |
await self.dispatcher.call_exception_handlers(e.__class__, user) | |
self.results[e.__class__] += 1 | |
else: | |
self.results['success'] += 1 | |
return True | |
return False | |
def get_results(self): | |
results = { | |
'timedelta': str(self.finish_time - self.start_time).split('.')[0], | |
'blocked': self.results[exceptions.BotBlocked], | |
'not_found': self.results[exceptions.ChatNotFound], | |
'deactivated': self.results[exceptions.UserDeactivated], | |
'errors': self.results[exceptions.TelegramAPIError], | |
'success': self.results['success'], | |
'total': self.results['total'] | |
} | |
return results | |
async def run(self): | |
self.active = True | |
self.start_time = datetime.now() | |
for user in self.user_list: | |
if self.active: | |
self.results['total'] += 1 | |
await self.send_message(user) | |
await asyncio.sleep(self.dispatcher.get_timeout()) | |
else: | |
break | |
self.dispatcher.broadcasts.remove(self) | |
self.finish_time = datetime.now() | |
await self.dispatcher.call_success(self.message, self.get_results()) | |
async def stop(self): | |
self.active = False | |
@abstractmethod | |
async def _dispatch(self, user): | |
pass | |
class MessageBroadcast(BaseBroadcast): | |
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup): | |
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup) | |
# self._dispatch = self._copy_message | |
async def _dispatch(self, user_id): | |
await self.message.send_copy(user_id, disable_notification=self.disable_notification, | |
reply_markup=self.reply_markup) | |
class ForwardBroadcast(BaseBroadcast): | |
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup): | |
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup) | |
# self._dispatch = self._forward_message | |
async def _dispatch(self, user_id): | |
await self.message.forward(user_id, disable_notification=self.disable_notification) | |
class TextBroadcast(BaseBroadcast): | |
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup): | |
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup) | |
self.bot = Bot.get_current() | |
async def _dispatch(self, user_id): | |
await self.bot.send_message(user_id, self.message, disable_notification=self.disable_notification, | |
reply_markup=self.reply_markup) | |
class TemplateBroadcast(BaseBroadcast): | |
def __init__(self, dispatcher, message, user_list, disable_notification, reply_markup): | |
super().__init__(dispatcher, message, user_list, disable_notification, reply_markup) | |
self.template = Template(self.message.html_text) | |
# self._dispatch = self._send_formatted | |
self.bot = Bot.get_current() | |
async def _dispatch(self, user_dict): | |
message_text = self.template.safe_substitute(**user_dict) | |
await self._send_with_caption(user_dict.get('user_id'), message_text) | |
async def _send_with_caption(self, user_id, text): | |
message = self.message | |
kwargs = { | |
'chat_id': user_id, | |
'disable_notification': self.disable_notification, | |
'reply_markup': self.reply_markup | |
} | |
if message.text: | |
await self.bot.send_message(text=text, **kwargs) | |
return | |
elif message.audio: | |
await self.bot.send_audio(audio=message.audio.file_id, | |
caption=text, | |
title=message.audio.title, | |
performer=message.audio.performer, | |
duration=message.audio.duration, | |
**kwargs | |
) | |
return | |
elif message.animation: | |
await self.bot.send_animation( | |
animation=message.animation.file_id, caption=text, **kwargs | |
) | |
return | |
elif message.document: | |
await self.bot.send_document( | |
document=message.document.file_id, caption=text, **kwargs | |
) | |
return | |
elif message.photo: | |
await self.bot.send_photo( | |
photo=message.photo[-1].file_id, caption=text, **kwargs | |
) | |
return | |
else: | |
await message.copy_to(**kwargs) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment