-
-
Save gabbhack/b536b08706228f6eb161af6d9ec5ba36 to your computer and use it in GitHub Desktop.
aiogram webhook multibot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from aiogram import Bot, types | |
from aiogram.contrib.middlewares.logging import LoggingMiddleware | |
from aiogram.dispatcher import Dispatcher | |
from aiogram.dispatcher.webhook import SendMessage | |
from aiogram.utils.executor import Executor | |
from aiogram.dispatcher.webhook import WebhookRequestHandler | |
class MultiBotWebhookRequestHandler(WebhookRequestHandler): | |
async def post(self): | |
self.validate_ip() | |
dispatcher = self.get_dispatcher() | |
update = await self.parse_update(dispatcher.bot) | |
with bot.with_token(self.request.match_info["token"]): | |
results = await self.process_update(update) | |
response = self.get_response(results) | |
if response: | |
web_response = response.get_web_response() | |
else: | |
web_response = web.Response(text="ok") | |
if self.request.app.get("RETRY_AFTER", None): | |
web_response.headers["Retry-After"] = self.request.app["RETRY_AFTER"] | |
return web_response | |
API_TOKEN = 'BOT_TOKEN_HERE' | |
# webhook settings | |
WEBHOOK_HOST = 'https://your.domain' | |
WEBHOOK_PATH = '/path/to/api/{token}' | |
WEBHOOK_URL = f"{WEBHOOK_HOST}{WEBHOOK_PATH}" | |
# webserver settings | |
WEBAPP_HOST = 'localhost' # or ip | |
WEBAPP_PORT = 3001 | |
logging.basicConfig(level=logging.INFO) | |
bot = Bot(token=API_TOKEN) | |
dp = Dispatcher(bot) | |
executor = Executor(dp) | |
dp.middleware.setup(LoggingMiddleware()) | |
@dp.message_handler() | |
async def echo(message: types.Message): | |
# Regular request | |
# await bot.send_message(message.chat.id, message.text) | |
# or reply INTO webhook | |
return SendMessage(message.chat.id, message.text) | |
async def on_startup(dp): | |
# Setup bots webhok | |
await bot.set_webhook(WEBHOOK_URL.format(token=API_TOKEN)) | |
with bot.with_token("another_token"): | |
await bot.set_webhook(WEBHOOK_URL.format(token="another_token")) | |
with bot.with_token("yet_another_token"): | |
await bot.set_webhook(WEBHOOK_URL.format(token="yet_another_token")) | |
async def on_shutdown(dp): | |
logging.warning('Shutting down..') | |
# insert code here to run it before shutdown | |
# Remove webhook (not acceptable in some cases) | |
await bot.delete_webhook() | |
with bot.with_token("another_token"): | |
await bot.delete_webhook() | |
with bot.with_token("yet_another_token"): | |
await bot.delete_webhook() | |
# Close DB connection (if used) | |
await dp.storage.close() | |
await dp.storage.wait_closed() | |
logging.warning('Bye!') | |
if __name__ == '__main__': | |
executor.start_webhook( | |
webhook_path=WEBHOOK_PATH, | |
on_startup=on_startup, | |
on_shutdown=on_shutdown, | |
host=WEBAPP_HOST, | |
port=WEBAPP_PORT, | |
request_handler=MultiBotWebhookRequestHandler | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment