Skip to content

Instantly share code, notes, and snippets.

@logsv
Last active October 2, 2019 13:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save logsv/ca9182d816437e5488bc62803758686f to your computer and use it in GitHub Desktop.
Save logsv/ca9182d816437e5488bc62803758686f to your computer and use it in GitHub Desktop.
Circuit Breaker
"""
Circuit Breaker Imnplementation.
"""
from datetime import datetime
# constants
CLOSED = 0
OPEN = 1
HALF_OPEN = 2
DEFAULT_THRESHOLD = 5 # failure count threshhold
DEFAULT_TIMEOUT = 60 # threshhold timeout in seconds
class FailureException(Exception):
pass
class CircuitBreakerOpen(Exception):
pass
class CircuitBreaker:
def __init__(self, *args, **kwargs):
self.state = CLOSED
self.threshold = kwargs.get('threshold', DEFAULT_THRESHOLD)
self.timeout = kwargs.get('timeout', DEFAULT_TIMEOUT)
self.failures = 0
self.last_failed = None
def _set_state(self):
if self.failures > self.threshold:
timeout = datetime.now() - self.last_failed
if timeout.total_seconds() > self.timeout:
self.state = HALF_OPEN
else:
self.state = OPEN
else:
self.state = CLOSED
def _reset_state(self):
self.failures = 0
self.last_failed = None
self.state = CLOSED
def _set_failures(self):
self.failures += 1
self.last_failed = datetime.now()
def run(self, callable):
self._set_state()
if self.state in (CLOSED, HALF_OPEN) :
try:
return callable()
except Exception as e:
self._set_failures()
raise FailureException("Failure!")
else:
self._reset_state()
elif self.state == OPEN:
raise CircuitBreakerOpen("Circuit Open!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment