Skip to content

Instantly share code, notes, and snippets.

@JackStouffer
Last active October 14, 2016 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JackStouffer/ff2d5dbc61d0713bb24dfe1d5acaf31a to your computer and use it in GitHub Desktop.
Save JackStouffer/ff2d5dbc61d0713bb24dfe1d5acaf31a to your computer and use it in GitHub Desktop.
A Circuit Breaker, basically a way to auto fail to relieve pressure on an external service after a number of failures in quick succession
import datetime
BREAKER_OPEN = 0
BREAKER_HALF_OPEN = 1
BREAKER_CLOSED = 2
class OpenException(Exception):
pass
class CircuitBreaker(object):
def __init__(self, open_level=500, error_rollover_time=3600, open_rollover_time=200):
""" Ctor
open_level = n number of errors to wait for to open the breaker
error_rollover_time = the number of seconds between errors that
needed to reset the error counter
open_rollover_time = the number of seconds that are needed to be error
free before the breaker will switch from BREAKER_HALF_OPEN to BREAKER_CLOSED
"""
self._error_count = 0
self._status = BREAKER_CLOSED
self._open_level = open_level
self._error_rollover_time = error_rollover_time
self._open_rollover_time = open_rollover_time
self._last_error_timestamp = None
self._open_timestamp = None
def increment_error(self):
""" Increment the error counter, but only if this error
is within rollover_time of of the last error, otherwise
reset the error count
"""
if self._status == BREAKER_HALF_OPEN:
self._status = BREAKER_OPEN
self._open_timestamp = datetime.datetime.now()
if (
self._last_error_timestamp is None or
datetime.datetime.now() - self._last_error_timestamp <= datetime.timedelta(seconds=self._error_rollover_time)
):
self._error_count += 1
else:
self._error_count = 1
self._last_error_timestamp = datetime.datetime.now()
if self._error_count == self._open_level:
self._status = BREAKER_OPEN
self._open_timestamp = datetime.datetime.now()
def get_status(self):
""" Get the current status of the breaker
"""
if self._status == BREAKER_OPEN and datetime.datetime.now() - self._open_timestamp <= datetime.timedelta(seconds=self._open_rollover_time):
return BREAKER_OPEN
elif self._status == BREAKER_OPEN and datetime.datetime.now() - self._open_timestamp > datetime.timedelta(seconds=self._open_rollover_time):
self._status = BREAKER_HALF_OPEN
return BREAKER_HALF_OPEN
elif self._status == BREAKER_HALF_OPEN and datetime.datetime.now() - self._open_timestamp > datetime.timedelta(seconds=self._open_rollover_time * 2):
self._status = BREAKER_CLOSED
self._error_count = 0
return BREAKER_CLOSED
elif self._status == BREAKER_HALF_OPEN and datetime.datetime.now() - self._open_timestamp < datetime.timedelta(seconds=self._open_rollover_time * 2):
return BREAKER_HALF_OPEN
elif self._status == BREAKER_CLOSED:
return BREAKER_CLOSED
def status_or_throw(self):
""" If the breaker is open, don't allow anything through
"""
if self.get_status() == BREAKER_OPEN:
raise OpenException()
return
#! ../env/bin/python
# -*- coding: utf-8 -*-
import pytest
import time
from circuit_breaker import CircuitBreaker, BREAKER_HALF_OPEN, BREAKER_OPEN, BREAKER_CLOSED
class TestCircuitBreaker:
def test_increment(self):
c = CircuitBreaker()
c.increment_error()
assert c._error_count == 1
def test_error_rollover(self):
c = CircuitBreaker(error_rollover_time=1)
c.increment_error()
c.increment_error()
time.sleep(1)
c.increment_error()
assert c._error_count == 1
def test_open(self):
c = CircuitBreaker(open_level=10)
for i in range(10):
c.increment_error()
if i != 9:
assert c.get_status() == BREAKER_CLOSED
else:
assert c.get_status() == BREAKER_OPEN
def test_throw(self):
c = CircuitBreaker(open_level=2)
c.increment_error()
c.increment_error()
try:
c.status_or_throw()
except OpenException:
assert True
else:
assert False
def test_half_open(self):
c = CircuitBreaker(open_level=3, open_rollover_time=2)
for i in range(3):
c.increment_error()
assert c.get_status() == BREAKER_OPEN
time.sleep(2)
assert c.get_status() == BREAKER_HALF_OPEN
def test_reopen(self):
c = CircuitBreaker(open_level=3, open_rollover_time=1)
for i in range(3):
c.increment_error()
assert c.get_status() == BREAKER_OPEN
time.sleep(1)
assert c.get_status() == BREAKER_HALF_OPEN
c.increment_error()
assert c.get_status() == BREAKER_OPEN
def test_auto_close(self):
c = CircuitBreaker(open_level=3, open_rollover_time=1)
for i in range(3):
c.increment_error()
assert c.get_status() == BREAKER_OPEN
time.sleep(1)
assert c.get_status() == BREAKER_HALF_OPEN
time.sleep(1)
assert c.get_status() == BREAKER_CLOSED
c.increment_error()
c.increment_error()
assert c.get_status() == BREAKER_CLOSED
c.increment_error()
assert c.get_status() == BREAKER_OPEN
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment