Skip to content

Instantly share code, notes, and snippets.

@jrydberg
Created November 14, 2012 17:28
Show Gist options
  • Save jrydberg/4073498 to your computer and use it in GitHub Desktop.
Save jrydberg/4073498 to your computer and use it in GitHub Desktop.
simple gevent supervisor
class UnhandledExceptionError(Exception):
"""The exception could not be handled by the supervisor."""
class SupervisorKilledError(Exception):
"""The supervisor was killed."""
class SupervisorAbortedError(Exception):
"""The supervisor gave up after maximum number of failures."""
class _Supervisor(SilentGreenlet):
"""Run a given function inside a separate greenlet, and possibly
restart it if it fails.
"""
def __init__(self, log, clock, max_delay, fn, args, kw):
SilentGreenlet.__init__(self, self._run, fn, args, kw)
self.log = log
self.clock = clock
self.max_delay = max_delay
self.handled = set()
self._delay = 1
self._timeout = None
self._abort_timeout = None
def handle(self, *exctypes):
"""Tell the supervisor that the given exception types are
valid, and that they supervisor should restart the greenlet if
one of these errors are encountered.
"""
for exctype in exctypes:
self.handled.add(exctype)
return self
def timeout(self, timeout):
"""Set a timeout on how long the supervised action may take.
If it times out, consider it a handled error.
"""
self._timeout = timeout
return self
def abort_after(self, timeout):
"""Abort the supervisor if the action hasn't completed within
the specified time frame, including restarts.
"""
self._abort_timeout = timeout
return self
def start(self):
"""Start supervising."""
SilentGreenlet.start(self)
return self
def close(self):
"""Terminate the supervisor."""
self.kill(SupervisorKilledError())
def _run(self, fn, args, kw):
started_at = self.clock.time()
while True:
g = SilentGreenlet.spawn(fn, *args, **kw)
try:
# Do the timeout in an inner try-except block so that
# if a Timeout is raised that is not ours is caught by
# the normal exception handler.
timeout = gevent.Timeout(
self._timeout or self._abort_timeout)
try:
with timeout:
return g.get()
except gevent.Timeout as t:
if t is not timeout:
raise
self.log.info("supervised action timed out")
except SupervisorKilledError:
self.log.info("supervisor killed")
raise
except Exception, e:
if not type(e) in self.handled:
self.log.error("got *UNHANDLED* exception",
exc_info=g._exc_info)
raise UnhandledExceptionError(e, g._exc_info)
self.log.info("caught handled exception: %r" % (
g._exception,), exc_info=g._exc_info)
time_spent = self.clock.time() - started_at
if self._abort_timeout and time_spent > self._abort_timeout:
self.log.error("supervisor aborted: timed out")
raise SupervisorAbortedError()
self._delay += random.random() * self._delay
delay = min(self.max_delay, self._delay)
self.log.debug("will restart action in %f seconds" % (delay,))
try:
self.clock.sleep(delay)
except SupervisorKilledError:
self.log.info("supervisor killed")
raise
def supervise(log, time, max_delay, fn, *args, **kw):
"""Create a supervisor for function C{fn}.
This will keep running C{fn} until it succeeds, provided that the
errors can be handled by the supervisor.
The supervisor will do exponential back-off, up to C{max_delay}
seconds. C{time} is a clock-like object that is used to sleep.
"""
return _Supervisor(log, time, max_delay, fn, args, kw)
class SupervisorTestCase(TestCase):
"""Test cases for our supervisor."""
def setUp(self):
self.log = mock()
self.clock = Clock()
self.max_delay = 10
def test_unhandled_errors_is_propagated(self):
def test():
raise ValueError("HELLO WORLD")
s = supervise(self.log, self.clock, self.max_delay, test)
self.assertRaises(UnhandledExceptionError, s.start().get)
def test_handled_errors_restarts_function(self):
def test(l=[]):
l.append('x')
if len(l) == 1:
raise ValueError("WHAT")
return len(l)
s = supervise(self.log, self.clock, self.max_delay, test).handle(
ValueError).start()
self.assertEquals(2, s.get())
def test_supervisor_aborts_after_timeout(self):
def test():
assert False, "oh no"
supervisor = supervise(self.log, self.clock, self.max_delay, test)
supervisor.handle(AssertionError).abort_after(2)
self.assertRaises(SupervisorAbortedError, supervisor.start().get)
def test_supervisor_action_timeout_restarts_action(self):
calls = []
def test(calls=calls):
calls.append(1)
gevent.sleep(10)
supervisor = supervise(self.log, self.clock, self.max_delay, test)
supervisor.timeout(1).abort_after(3)
self.assertRaises(SupervisorAbortedError, supervisor.start().get)
# Just make sure that we have called test more than once.
self.assertTrue(sum(calls) > 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment