Skip to content

Instantly share code, notes, and snippets.

@stoensin
Last active March 31, 2020 08:30
Show Gist options
  • Save stoensin/9784d8bb377a27991853f2e743906d82 to your computer and use it in GitHub Desktop.
Save stoensin/9784d8bb377a27991853f2e743906d82 to your computer and use it in GitHub Desktop.
'''
1 当指定异常被引发时,使用on_exception装饰器重试。这里有一个例子,当出现任何requests异常时,使用指数退避(backoff.expo即退避时间指数增长):
2 当目标函数返回值符合某个特定条件时,on_predicate装饰器会安排重试。当为外部生成内容轮询资源时可能有用。
3 两个backoff装饰器都可以选择使用关键字参数on_success、on_backoff和on_giveup接受事件处理程序函数。这在报告统计或执行其他自定义日志方面可能有用。
'''
@backoff.on_predicate(backoff.fibo, max_value=13)
@backoff.on_exception(backoff.expo,
(requests.exceptions.HTTPError,requests.exceptions.Timeout),
max_time=60)
@backoff.on_exception(backoff.expo,
requests.exceptions.Timeout,
max_time=300)
def poll_for_message(queue):
return queue.get()
my_logger = logging.getLogger('my_logger')
my_handler = logging.StreamHandler()
my_logger.add_handler(my_handler)
my_logger.setLevel(logging.ERROR)
@backoff.on_exception(backoff.expo,
requests.exception.RequestException,
logger=my_logger)
def test():
print(1)
import time
import random
import backoff
class MyException(Exception):
def __init__(self, status, message):
super().__init__(status, message)
self.status = status
self.message = message
def backoff_hdlr(details):
print("Backing off {wait:0.1f} seconds afters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
def success_hdlr(details):
print("Success offafters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
def giveup_hdlr(details):
print("Giveup off {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
@backoff.on_predicate(
backoff.constant,
# 当 random num 不等 10009 则继续
# 当 random_num 等于 10009 后,会调用 on_success
lambda x: x != 10009,
on_success=success_hdlr,
on_backoff=backoff_hdlr,
on_giveup=giveup_hdlr,
max_time=2
)
def main():
num = random.randint(10000, 10010)
print("time is {}, num is {}, retry...".format(time.time(), num))
return num
@backoff.on_exception(
backoff.constant,
MyException,
# 当 Exception 实例对象的 status 为 10009 成立时退出
# 当条件成立时,调用的是 on_giveup
giveup=lambda e: e.status == 10009,
on_success=success_hdlr,
on_backoff=backoff_hdlr,
on_giveup=giveup_hdlr,
)
def main2():
num = random.randint(10000, 10010)
print("time is {}, num is {}, retry...".format(time.time(), num))
# 如果是通过这个条件成立退出,调用的是 on_success
if num == 10010:
return
raise MyException(num, "hhh")
if __name__ == "__main__":
#main()
main2()
import asyncio
import copy
import inspect
import logging
from functools import wraps
import async_timeout
logger = logging.getLogger(__name__)
__version__ = '0.2.2'
propagate = forever = ...
class RetryError(Exception):
pass
class ConditionError(Exception):
pass
def unpartial(fn):
while hasattr(fn, 'func'):
fn = fn.func
return fn
def isexception(obj):
return (
isinstance(obj, Exception) or
(inspect.isclass(obj) and (issubclass(obj, Exception)))
)
@asyncio.coroutine
def callback(attempt, exc, args, kwargs, delay=None, *, loop):
if delay is None:
delay = callback.delay
yield from asyncio.sleep(attempt * delay, loop=loop)
return retry
callback.delay = 0.5
def retry(
fn=None,
*,
attempts=3,
immutable=False,
cls=False,
kwargs=False,
callback=callback,
fallback=RetryError,
timeout=None,
retry_exceptions=(Exception,),
fatal_exceptions=(asyncio.CancelledError,),
loop=None # noqa
):
def wrapper(fn):
@wraps(fn)
@asyncio.coroutine
def wrapped(*fn_args, **fn_kwargs):
if isinstance(loop, str):
assert cls ^ kwargs, 'choose self.loop or kwargs["loop"]'
if cls:
_self = getattr(unpartial(fn), '__self__', None)
if _self is None:
assert fn_args, 'seems not unbound function'
_self = fn_args[0]
_loop = getattr(_self, loop)
elif kwargs:
_loop = fn_kwargs[loop]
elif loop is None:
_loop = asyncio.get_event_loop()
else:
_loop = loop
if (
timeout is not None and
asyncio.TimeoutError not in retry_exceptions
):
_retry_exceptions = (asyncio.TimeoutError,) + retry_exceptions
else:
_retry_exceptions = retry_exceptions
attempt = 1
if cls:
assert fn_args
self, *fn_args = fn_args
fn_args = tuple(fn_args)
while True:
if immutable:
_fn_args = copy.deepcopy(fn_args)
kwargs_loop = isinstance(loop, str) and kwargs
if kwargs_loop:
obj = fn_kwargs.pop(loop)
_fn_kwargs = copy.deepcopy(fn_kwargs)
if kwargs_loop:
fn_kwargs[loop] = _fn_kwargs[loop] = obj
else:
_fn_args, _fn_kwargs = fn_args, fn_kwargs
if cls:
_fn_args = (self,) + _fn_args
try:
ret = fn(*_fn_args, **_fn_kwargs)
if timeout is None:
if asyncio.iscoroutinefunction(unpartial(fn)):
ret = yield from ret
else:
if not asyncio.iscoroutinefunction(unpartial(fn)):
raise ConditionError(
'Can\'t set timeout for non coroutinefunction',
)
with async_timeout.timeout(timeout, loop=_loop):
ret = yield from ret
return ret
except ConditionError:
raise
except fatal_exceptions:
raise
except _retry_exceptions as exc:
_attempts = 'infinity' if attempts is forever else attempts
context = {
'fn': fn,
'attempt': attempt,
'attempts': _attempts,
}
if (
_loop.get_debug() or
(attempts is not forever and attempt == attempts)
):
logger.warning(
exc.__class__.__name__ + ' -> Attempts (%(attempt)d) are over for %(fn)r', # noqa
context,
exc_info=exc,
)
if fallback is propagate:
raise exc
if isexception(fallback):
raise fallback from exc
if callable(fallback):
ret = fallback(fn_args, fn_kwargs, loop=_loop)
if asyncio.iscoroutinefunction(unpartial(fallback)): # noqa
ret = yield from ret
else:
ret = fallback
return ret
logger.debug(
exc.__class__.__name__ + ' -> Tried attempt #%(attempt)d from total %(attempts)s for %(fn)r', # noqa
context,
exc_info=exc,
)
ret = callback(
attempt, exc, fn_args, fn_kwargs, loop=_loop,
)
attempt += 1
if asyncio.iscoroutinefunction(unpartial(callback)):
ret = yield from ret
if ret is not retry:
return ret
return wrapped
if fn is None:
return wrapper
if callable(fn):
return wrapper(fn)
raise NotImplementedError
import logging
import random
import time
import decorator
from functools import partial
logging_logger = logging.getLogger(__name__)
def __retry_internal(f, exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0,
logger=logging_logger):
"""
Executes a function and retries it if it failed.
:param f: the function to execute.
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
:param tries: the maximum number of attempts. default: -1 (infinite).
:param delay: initial delay between attempts. default: 0.
:param max_delay: the maximum value of delay. default: None (no limit).
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
:param jitter: extra seconds added to delay between attempts. default: 0.
fixed if a number, random if a range tuple (min, max)
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
:returns: the result of the f function.
"""
_tries, _delay = tries, delay
while _tries:
try:
return f()
except exceptions as e:
_tries -= 1
if not _tries:
raise
if logger is not None:
logger.warning('%s, retrying in %s seconds...', e, _delay)
time.sleep(_delay)
_delay *= backoff
if isinstance(jitter, tuple):
_delay += random.uniform(*jitter)
else:
_delay += jitter
if max_delay is not None:
_delay = min(_delay, max_delay)
def retry(exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, logger=logging_logger):
"""Returns a retry decorator.
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
:param tries: the maximum number of attempts. default: -1 (infinite).
:param delay: initial delay between attempts. default: 0.
:param max_delay: the maximum value of delay. default: None (no limit).
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
:param jitter: extra seconds added to delay between attempts. default: 0.
fixed if a number, random if a range tuple (min, max)
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
:returns: a retry decorator.
"""
@decorator
def retry_decorator(f, *fargs, **fkwargs):
args = fargs if fargs else list()
kwargs = fkwargs if fkwargs else dict()
return __retry_internal(partial(f, *args, **kwargs), exceptions, tries, delay, max_delay, backoff, jitter,
logger)
return retry_decorator
def retry_call(f, fargs=None, fkwargs=None, exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1,
jitter=0,
logger=logging_logger):
"""
Calls a function and re-executes it if it failed.
:param f: the function to execute.
:param fargs: the positional arguments of the function to execute.
:param fkwargs: the named arguments of the function to execute.
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
:param tries: the maximum number of attempts. default: -1 (infinite).
:param delay: initial delay between attempts. default: 0.
:param max_delay: the maximum value of delay. default: None (no limit).
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
:param jitter: extra seconds added to delay between attempts. default: 0.
fixed if a number, random if a range tuple (min, max)
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
:returns: the result of the f function.
"""
args = fargs if fargs else list()
kwargs = fkwargs if fkwargs else dict()
return __retry_internal(partial(f, *args, **kwargs), exceptions, tries, delay, max_delay, backoff, jitter, logger)
def retry_handler(retry_time: int, retry_interval: float, retry_on_exception: [BaseException], *args, **kwargs):
def is_exception(exception: [BaseException]):
for exp in retry_on_exception:
if isinstance(exception,exp):
return True
return False
# return isinstance(exception, retry_on_exception)
def _retry(*args, **kwargs):
return Retrying(wait_fixed=retry_interval * 1000).fixed_sleep(*args, **kwargs)
return retry(
wait_func=_retry,
stop_max_attempt_number=retry_time,
retry_on_exception=is_exception
)
class ProxyUtil:
def get_proxies(self):
r = requests.get('代理地址')
print('正在获取')
raise IOError
# raise IndexError
print('获取到最新代理 = %s' % r.text)
params = dict()
if r and r.status_code == 200:
proxy = str(r.content, encoding='utf-8')
params['http'] = 'http://' + proxy
params['https'] = 'https://' + proxy
@retry_handler(retry_time=2, retry_interval=5, retry_on_exception=[IOError,IndexError])
# @retry(stop_max_attempt_number=5,retry_on_exception=wraper)
def retry_test(self):
self.get_proxies()
print('io')
if __name__ == '__main__':
proxy = ProxyUtil()
proxy.retry_test()
# demo useage
@retry
def never_give_up_never_surrender():
print "Retry forever ignoring Exceptions, don't wait between retries"
@retry(stop_max_attempt_number=7) #Stopping after 7 attempts
@retry(stop_max_delay=10000) #Stopping after 10 second
@retry(wait_fixed=2000) #Wait 2 second between retries
@retry(wait_random_min=1000, wait_random_max=2000) #Randomly wait 1 to 2 seconds between retries
def retry_if_io_error(exception):
"""Return True if we should retry (in this case when it's an IOError), False otherwise"""
return isinstance(exception, IOError)
@retry(retry_on_exception=retry_if_io_error)
def might_io_error():
print "Retry forever with no wait if an IOError occurs, raise any other errors"
@retry(retry_on_exception=retry_if_io_error, wrap_exception=True)
def only_raise_retry_error_when_not_io_error():
print "Retry forever with no wait if an IOError occurs, raise any other errors wrapped in RetryError"
def retry_if_result_none(result):
"""Return True if we should retry (in this case when result is None), False otherwise"""
return result is None
@retry(retry_on_result=retry_if_result_none)
def might_return_none():
print "Retry forever ignoring Exceptions with no wait if return value is None"
import inspect
import random
import sys
import time
import traceback
import six
# sys.maxint / 2, since Python 3.2 doesn't have a sys.maxint...
MAX_WAIT = 1073741823
def _retry_if_exception_of_type(retryable_types):
def _retry_if_exception_these_types(exception):
return isinstance(exception, retryable_types)
return _retry_if_exception_these_types
def retry(*dargs, **dkw):
"""
Decorator function that instantiates the Retrying object
@param *dargs: positional arguments passed to Retrying object
@param **dkw: keyword arguments passed to the Retrying object
"""
# support both @retry and @retry() as valid syntax
if len(dargs) == 1 and callable(dargs[0]):
def wrap_simple(f):
@six.wraps(f)
def wrapped_f(*args, **kw):
if dkw.get('deterministic_generators') and \
(inspect.isasyncgenfunction(f) or inspect.isgeneratorfunction(f)):
return Retrying().call_async(f, *args, **kw)
return Retrying().call(f, *args, **kw)
return wrapped_f
return wrap_simple(dargs[0])
else:
def wrap(f):
@six.wraps(f)
def wrapped_f(*args, **kw):
if dkw.get('deterministic_generators') and \
(inspect.isasyncgenfunction(f) or inspect.isgeneratorfunction(f)):
return Retrying(*dargs, **dkw).call_async(f, *args, **kw)
return Retrying(*dargs, **dkw).call(f, *args, **kw)
return wrapped_f
return wrap
class Retrying(object):
def __init__(self,
stop=None, wait=None,
stop_max_attempt_number=None,
stop_max_delay=None,
wait_fixed=None,
wait_random_min=None, wait_random_max=None,
wait_incrementing_start=None, wait_incrementing_increment=None,
wait_incrementing_max=None,
wait_exponential_multiplier=None, wait_exponential_max=None,
retry_on_exception=None,
retry_on_result=None,
wrap_exception=False,
stop_func=None,
wait_func=None,
wait_jitter_max=None,
before_attempts=None,
after_attempts=None,
deterministic_generators=False):
self._stop_max_attempt_number = 5 if stop_max_attempt_number is None else stop_max_attempt_number
self._stop_max_delay = 100 if stop_max_delay is None else stop_max_delay
self._wait_fixed = 1000 if wait_fixed is None else wait_fixed
self._wait_random_min = 0 if wait_random_min is None else wait_random_min
self._wait_random_max = 1000 if wait_random_max is None else wait_random_max
self._wait_incrementing_start = 0 if wait_incrementing_start is None else wait_incrementing_start
self._wait_incrementing_increment = 100 if wait_incrementing_increment is None else wait_incrementing_increment
self._wait_exponential_multiplier = 1 if wait_exponential_multiplier is None else wait_exponential_multiplier
self._wait_exponential_max = MAX_WAIT if wait_exponential_max is None else wait_exponential_max
self._wait_incrementing_max = MAX_WAIT if wait_incrementing_max is None else wait_incrementing_max
self._wait_jitter_max = 0 if wait_jitter_max is None else wait_jitter_max
self._before_attempts = before_attempts
self._after_attempts = after_attempts
self._deterministic_generators = deterministic_generators
self._deterministic_offset = -1
# TODO add chaining of stop behaviors
# stop behavior
stop_funcs = []
if stop_max_attempt_number is not None:
stop_funcs.append(self.stop_after_attempt)
if stop_max_delay is not None:
stop_funcs.append(self.stop_after_delay)
if stop_func is not None:
self.stop = stop_func
elif stop is None:
self.stop = lambda attempts, delay: any(f(attempts, delay) for f in stop_funcs)
else:
self.stop = getattr(self, stop)
# TODO add chaining of wait behaviors
# wait behavior
wait_funcs = [lambda *args, **kwargs: 0]
if wait_fixed is not None:
wait_funcs.append(self.fixed_sleep)
if wait_random_min is not None or wait_random_max is not None:
wait_funcs.append(self.random_sleep)
if wait_incrementing_start is not None or wait_incrementing_increment is not None:
wait_funcs.append(self.incrementing_sleep)
if wait_exponential_multiplier is not None or wait_exponential_max is not None:
wait_funcs.append(self.exponential_sleep)
if wait_func is not None:
self.wait = wait_func
elif wait is None:
self.wait = lambda attempts, delay: max(f(attempts, delay) for f in wait_funcs)
else:
self.wait = getattr(self, wait)
# retry on exception filter
if retry_on_exception is None:
self._retry_on_exception = self.always_reject
else:
# this allows for providing a tuple of exception types that
# should be allowed to retry on, and avoids having to create
# a callback that does the same thing
if isinstance(retry_on_exception, (tuple)):
retry_on_exception = _retry_if_exception_of_type(
retry_on_exception)
self._retry_on_exception = retry_on_exception
# retry on result filter
if retry_on_result is None:
self._retry_on_result = self.never_reject
else:
self._retry_on_result = retry_on_result
self._wrap_exception = wrap_exception
def stop_after_attempt(self, previous_attempt_number, delay_since_first_attempt_ms):
"""Stop after the previous attempt >= stop_max_attempt_number."""
return previous_attempt_number >= self._stop_max_attempt_number
def stop_after_delay(self, previous_attempt_number, delay_since_first_attempt_ms):
"""Stop after the time from the first attempt >= stop_max_delay."""
return delay_since_first_attempt_ms >= self._stop_max_delay
@staticmethod
def no_sleep(previous_attempt_number, delay_since_first_attempt_ms):
"""Don't sleep at all before retrying."""
return 0
def fixed_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
"""Sleep a fixed amount of time between each retry."""
return self._wait_fixed
def random_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
"""Sleep a random amount of time between wait_random_min and wait_random_max"""
return random.randint(self._wait_random_min, self._wait_random_max)
def incrementing_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
"""
Sleep an incremental amount of time after each attempt, starting at
wait_incrementing_start and incrementing by wait_incrementing_increment
"""
result = self._wait_incrementing_start + (self._wait_incrementing_increment * (previous_attempt_number - 1))
if result > self._wait_incrementing_max:
result = self._wait_incrementing_max
if result < 0:
result = 0
return result
def exponential_sleep(self, previous_attempt_number, delay_since_first_attempt_ms):
exp = 2 ** previous_attempt_number
result = self._wait_exponential_multiplier * exp
if result > self._wait_exponential_max:
result = self._wait_exponential_max
if result < 0:
result = 0
return result
@staticmethod
def never_reject(result):
return False
@staticmethod
def always_reject(result):
return True
def should_reject(self, attempt):
reject = False
if attempt.has_exception:
reject |= self._retry_on_exception(attempt.value[1])
else:
reject |= self._retry_on_result(attempt.value)
return reject
def call(self, fn, *args, **kwargs):
self._deterministic_offset = -1
assert not self._deterministic_generators
is_generator = inspect.isasyncgenfunction(fn) or inspect.isgeneratorfunction(fn)
start_time = int(round(time.time() * 1000))
attempt_number = 1
while True:
if self._before_attempts:
self._before_attempts(attempt_number)
try:
if is_generator:
result = list(fn(*args, **kwargs))
else:
result = fn(*args, **kwargs)
attempt = Attempt(result, attempt_number, False)
except:
tb = sys.exc_info()
attempt = Attempt(tb, attempt_number, True)
if not self.should_reject(attempt):
if is_generator:
return self._yelded_data(attempt)
return attempt.get(self._wrap_exception)
if self._after_attempts:
self._after_attempts(attempt_number)
delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time
if self.stop(attempt_number, delay_since_first_attempt_ms):
if not self._wrap_exception and attempt.has_exception:
# get() on an attempt with an exception should cause it to be raised, but raise just in case
raise attempt.get()
else:
raise RetryError(attempt)
else:
sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
if self._wait_jitter_max:
jitter = random.random() * self._wait_jitter_max
sleep = sleep + max(0, jitter)
time.sleep(sleep / 1000.0)
attempt_number += 1
def call_async(self, fn, *args, **kwargs):
self._deterministic_offset = -1
assert self._deterministic_generators
assert inspect.isasyncgenfunction(fn) or inspect.isgeneratorfunction(fn)
start_time = int(round(time.time() * 1000))
attempt_number = 1
while True:
if self._before_attempts:
self._before_attempts(attempt_number)
try:
result = yield from self._deterministic_generation(fn, *args, **kwargs)
attempt = Attempt(result, attempt_number, False)
except:
tb = sys.exc_info()
attempt = Attempt(tb, attempt_number, True)
if not self.should_reject(attempt):
return self._yelded_data(attempt)
if self._after_attempts:
self._after_attempts(attempt_number)
delay_since_first_attempt_ms = int(round(time.time() * 1000)) - start_time
if self.stop(attempt_number, delay_since_first_attempt_ms):
if not self._wrap_exception and attempt.has_exception:
# get() on an attempt with an exception should cause it to be raised, but raise just in case
raise attempt.get()
else:
raise RetryError(attempt)
else:
sleep = self.wait(attempt_number, delay_since_first_attempt_ms)
if self._wait_jitter_max:
jitter = random.random() * self._wait_jitter_max
sleep = sleep + max(0, jitter)
time.sleep(sleep / 1000.0)
attempt_number += 1
def _yelded_data(self, attempt):
yield from attempt.get(self._wrap_exception)
def _deterministic_generation(self, fn, *args, **kwargs):
for i, v in enumerate(fn(*args, **kwargs)):
if i <= self._deterministic_offset:
continue
yield v
self._deterministic_offset = i
class Attempt(object):
"""
An Attempt encapsulates a call to a target function that may end as a
normal return value from the function or an Exception depending on what
occurred during the execution.
"""
def __init__(self, value, attempt_number, has_exception):
self.value = value
self.attempt_number = attempt_number
self.has_exception = has_exception
def get(self, wrap_exception=False):
"""
Return the return value of this Attempt instance or raise an Exception.
If wrap_exception is true, this Attempt is wrapped inside of a
RetryError before being raised.
"""
if self.has_exception:
if wrap_exception:
raise RetryError(self)
else:
six.reraise(self.value[0], self.value[1], self.value[2])
else:
return self.value
def __repr__(self):
if self.has_exception:
return "Attempts: {0}, Error:\n{1}".format(self.attempt_number, "".join(traceback.format_tb(self.value[2])))
else:
return "Attempts: {0}, Value: {1}".format(self.attempt_number, self.value)
class RetryError(Exception):
"""
A RetryError encapsulates the last Attempt instance right before giving up.
"""
def __init__(self, last_attempt):
self.last_attempt = last_attempt
def __str__(self):
return "RetryError[{0}]".format(self.last_attempt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment