Skip to content

Instantly share code, notes, and snippets.

@caiaffa
Created July 16, 2019 20:15
Show Gist options
  • Save caiaffa/ba5b7cfcbdf8e9c467809afa5e0aa4a0 to your computer and use it in GitHub Desktop.
Save caiaffa/ba5b7cfcbdf8e9c467809afa5e0aa4a0 to your computer and use it in GitHub Desktop.
import calendar
import time
from datetime import timedelta, datetime
from aiobreaker import CircuitBreaker
from aiobreaker.state import STATE_CLOSED
from aiobreaker.storage.redis import CircuitRedisStorage
from functools import wraps
from configs.settings import CB_SETTINGS, CACHE_KEY_PREFIX
from helpers.cache_servers import RedisServer
redis = RedisServer()
def circuit_breaker(service='default'):
def circuit(func):
@wraps(func)
async def wrapper(*args, **kwargs):
reset_timeout = CB_SETTINGS[service][func.__name__]['RESET_TIMEOUT']
namespace = f'{CACHE_KEY_PREFIX}:circuit-break:{service}:{func.__name__}'
tm_opened = int(calendar.timegm(datetime.now().timetuple()))
timeout_duration = timedelta(seconds=reset_timeout)
tm_cb_opened = redis.get(f'{namespace}-custom-opened-at')
if not tm_cb_opened:
redis.set(f'{namespace}-custom-opened-at', tm_opened)
tm_cb_opened = tm_opened
dt_opened = datetime(*time.gmtime(int(tm_cb_opened))[:6])
dt_opened_duration = dt_opened + timeout_duration
dt_opened_now = datetime(*time.gmtime(int(tm_opened))[:6])
if dt_opened_duration < dt_opened_now:
reset_timeout = 0
redis.delete(f'{namespace}-custom-opened-at')
cb = CircuitBreaker(
fail_max=CB_SETTINGS[service][func.__name__]['FAIL_MAX'],
timeout_duration=timedelta(seconds=reset_timeout),
state_storage=CircuitRedisStorage(
state=STATE_CLOSED,
redis_object=redis,
namespace=namespace
)
)
return await cb.call_async(func, *args, **kwargs)
return wrapper
return circuit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment