Skip to content

Instantly share code, notes, and snippets.

@gabbhack

gabbhack/main.py Secret

Created February 10, 2020 14:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gabbhack/b536b08706228f6eb161af6d9ec5ba36 to your computer and use it in GitHub Desktop.
Save gabbhack/b536b08706228f6eb161af6d9ec5ba36 to your computer and use it in GitHub Desktop.
aiogram webhook multibot
import logging
from aiogram import Bot, types
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.dispatcher import Dispatcher
from aiogram.dispatcher.webhook import SendMessage
from aiogram.utils.executor import Executor
from aiogram.dispatcher.webhook import WebhookRequestHandler
class MultiBotWebhookRequestHandler(WebhookRequestHandler):
async def post(self):
self.validate_ip()
dispatcher = self.get_dispatcher()
update = await self.parse_update(dispatcher.bot)
with bot.with_token(self.request.match_info["token"]):
results = await self.process_update(update)
response = self.get_response(results)
if response:
web_response = response.get_web_response()
else:
web_response = web.Response(text="ok")
if self.request.app.get("RETRY_AFTER", None):
web_response.headers["Retry-After"] = self.request.app["RETRY_AFTER"]
return web_response
API_TOKEN = 'BOT_TOKEN_HERE'
# webhook settings
WEBHOOK_HOST = 'https://your.domain'
WEBHOOK_PATH = '/path/to/api/{token}'
WEBHOOK_URL = f"{WEBHOOK_HOST}{WEBHOOK_PATH}"
# webserver settings
WEBAPP_HOST = 'localhost' # or ip
WEBAPP_PORT = 3001
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
executor = Executor(dp)
dp.middleware.setup(LoggingMiddleware())
@dp.message_handler()
async def echo(message: types.Message):
# Regular request
# await bot.send_message(message.chat.id, message.text)
# or reply INTO webhook
return SendMessage(message.chat.id, message.text)
async def on_startup(dp):
# Setup bots webhok
await bot.set_webhook(WEBHOOK_URL.format(token=API_TOKEN))
with bot.with_token("another_token"):
await bot.set_webhook(WEBHOOK_URL.format(token="another_token"))
with bot.with_token("yet_another_token"):
await bot.set_webhook(WEBHOOK_URL.format(token="yet_another_token"))
async def on_shutdown(dp):
logging.warning('Shutting down..')
# insert code here to run it before shutdown
# Remove webhook (not acceptable in some cases)
await bot.delete_webhook()
with bot.with_token("another_token"):
await bot.delete_webhook()
with bot.with_token("yet_another_token"):
await bot.delete_webhook()
# Close DB connection (if used)
await dp.storage.close()
await dp.storage.wait_closed()
logging.warning('Bye!')
if __name__ == '__main__':
executor.start_webhook(
webhook_path=WEBHOOK_PATH,
on_startup=on_startup,
on_shutdown=on_shutdown,
host=WEBAPP_HOST,
port=WEBAPP_PORT,
request_handler=MultiBotWebhookRequestHandler
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment