Skip to content

Instantly share code, notes, and snippets.

@Olegt0rr
Created September 8, 2022 19:48
Show Gist options
  • Save Olegt0rr/e97f3c48eaeda2368702aff1a8fed7f6 to your computer and use it in GitHub Desktop.
Save Olegt0rr/e97f3c48eaeda2368702aff1a8fed7f6 to your computer and use it in GitHub Desktop.
RetryAfter backoff middleware for aiogram
import asyncio
import logging
from typing import Awaitable, Callable, Optional
from aiogram import Bot
from aiogram.client.session.middlewares.base import (
BaseRequestMiddleware,
NextRequestMiddlewareType,
)
from aiogram.exceptions import TelegramRetryAfter
from aiogram.methods import Response, TelegramMethod
from aiogram.methods.base import TelegramType
class RetryAfterMiddleware(BaseRequestMiddleware):
"""Backoff middleware for RetryAfter exception."""
def __init__(
self,
max_tries: Optional[int] = None,
on_giveup: Optional[Callable[Awaitable[Response[TelegramType]]]] = None,
raise_on_giveup: bool = True,
logger: Optional[logging.Logger] = None,
):
if raise_on_giveup and on_giveup is not None:
raise RuntimeError("Can't call `on_giveup` with enabled `raise_on_giveup`")
self._max_tries = max_tries
self._on_giveup = on_giveup
self._raise_on_giveup = raise_on_giveup
self._log = logger or logging.getLogger(__name__)
async def __call__(
self,
make_request: NextRequestMiddlewareType[TelegramType],
bot: Bot,
method: TelegramMethod[TelegramType],
retry_counter: int = 0,
) -> Response[TelegramType]:
try:
return await make_request(bot, method)
except TelegramRetryAfter as exc:
if giveup := await self._give_up(exc, retry_counter):
return giveup
self._log.debug(
"TelegramRetryAfter backoff will call the %s again in %s seconds. "
"Exception: %s",
type(method).__name__,
exc.retry_after,
exc.message,
)
await asyncio.sleep(exc.retry_after)
return await self.__call__(
make_request=make_request,
bot=bot,
method=method,
retry_counter=retry_counter + 1,
)
async def _give_up(
self,
exc: TelegramRetryAfter,
retry_counter: int,
) -> Optional[Response[TelegramType]]:
"""Check `max try` value is reached."""
if self._max_tries is None:
return None
if retry_counter <= self._max_tries:
return None
self._log.debug(
"TelegramRetryAfter backoff gave up after %s attempts. Exception: %s",
retry_counter,
exc.message,
)
if self._raise_on_giveup:
raise
if self._on_giveup is not None:
return await self._on_giveup()
return Response[None]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment