Skip to content

Instantly share code, notes, and snippets.

@ajdavis
Created October 12, 2012 03:52
Show Gist options
  • Select an option

  • Save ajdavis/3877210 to your computer and use it in GitHub Desktop.

Select an option

Save ajdavis/3877210 to your computer and use it in GitHub Desktop.
Tornado AsyncResult proposal
import logging
import time
from functools import partial
from tornado import stack_context
from tornado.ioloop import IOLoop
class NotReady(Exception):
pass
class AlreadySet(Exception):
pass
def _check_callback(callback):
"""
Toro runs this on any callback before registering on IOLoop for future
execution, to reduce confusion about the source of a TypeError. Note that
calling a callback directly, or putting it in a _Waiter, don't require
check_callback().
"""
if not callable(callback):
raise TypeError(
"callback must be callable, not %s" % repr(callback))
class ToroBase(object):
def __init__(self, io_loop):
self.io_loop = io_loop or IOLoop.instance()
def _run_callback(self, callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except Exception:
self.handle_callback_exception(callback)
def handle_callback_exception(self, callback):
"""This method is called whenever a callback run Toro throws an
exception.
By default simply logs the exception as an error. Subclasses
may override this method to customize reporting of exceptions.
The exception itself is not passed explicitly, but is available
in sys.exc_info.
Copied from IOLoop's implementation.
"""
logging.error("Exception in callback %r", callback, exc_info=True)
class _Waiter(ToroBase):
"""Internal Toro utility class"""
def __init__(self, timeout, timeout_args, io_loop, callback):
"""
Create a deferred callback. If timeout is not None, it's the number of
seconds in the future at which to time-out this waiter. Note that
timeout is seconds into the future, not a "deadline" in Unix timestamp
as in Tornado. callback(*timeout_args) is executed after the timeout.
Waiters are only ever executed once.
"""
super(_Waiter, self).__init__(io_loop)
_check_callback(callback)
if timeout is not None:
self.io_loop.add_timeout(
time.time() + timeout, partial(self.run, *timeout_args))
# Capture the current stack context so it's restored when we're run
# after deferral.
self.callback = stack_context.wrap(callback)
def run(self, *args, **kwargs):
if self.callback:
callback, self.callback = self.callback, None
# Clear the current stack context and run in the context that was
# captured when we initialized.
with stack_context.NullContext():
self._run_callback(callback, *args, **kwargs)
@property
def expired(self):
return not self.callback
class AsyncResult(ToroBase):
"""A one-time event that stores a value or an exception.
Like :class:`Event` it wakes up all the waiters when :meth:`set`
is called. Waiters may receive the passed value by calling :meth:`get`
method instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be
reset.
To pass a value call :meth:`set`. Calls to :meth:`get` (those currently
blocking as well as those made in the future) will return the value:
>>> result = AsyncResult()
>>> result.set(100)
>>> result.get()
100
"""
def __init__(self, io_loop=None):
super(AsyncResult, self).__init__(io_loop)
self._ready = False
self.value = None
self.waiters = []
def set(self, value):
if self._ready:
raise AlreadySet
self.value = value
self._ready = True
waiters, self.waiters = self.waiters, []
for waiter in waiters:
waiter.run(value)
def ready(self):
return self._ready
def get(self, callback=None, timeout=None):
if self.ready():
if not callback:
# Non-blocking get
return self.value
_check_callback(callback)
self.io_loop.add_callback(partial(callback, self.value))
else:
if not callback:
raise NotReady
# After timeout, callback will be passed None
self.waiters.append(
_Waiter(timeout, (None,), self.io_loop, callback))
import time
import unittest
from tornado import stack_context, ioloop
import async_result
class TestAsyncResult(unittest.TestCase):
def test_exc(self):
# Test that raising an exception from a get() callback doesn't
# propagate up to set()'s caller, and that StackContexts are correctly
# managed
result = async_result.AsyncResult()
loop = ioloop.IOLoop.instance()
loop.add_timeout(time.time() + .02, loop.stop)
# Absent Python 3's nonlocal keyword, we need some place to store
# results from inner functions
outcomes = {
'value': None,
'set_result_exc': None,
'get_result_exc': None,
}
def set_result():
try:
result.set('hello')
except Exception, e:
outcomes['set_result_exc'] = e
def callback(value):
outcomes['value'] = value
assert False
def catch_get_result_exception(type, value, traceback):
outcomes['get_result_exc'] = type
with stack_context.ExceptionStackContext(catch_get_result_exception):
result.get(callback)
loop.add_timeout(time.time() + .01, set_result)
loop.start()
self.assertEqual(outcomes['value'], 'hello')
self.assertEqual(outcomes['get_result_exc'], AssertionError)
self.assertEqual(outcomes['set_result_exc'], None)
def test_stack_context_leak(self):
# Test that stack context isn't leaked from set() to a deferred get().
# This is different from test_exc -- test_exc tests that _Waiter calls
# stack_context.wrap() so a stack context we set when we scheduled the
# get() is restored when get's callback is run. Here, we test that if
# get()'s callback schedules another callback that raises an exception,
# that exception isn't caught by set()'s stack context.
result = async_result.AsyncResult()
loop = ioloop.IOLoop.instance()
loop.add_timeout(time.time() + .02, loop.stop)
# Absent Python 3's nonlocal keyword, we need some place to store
# results from inner functions
outcomes = {
'value': None,
'set_result_exc': None,
}
def set_result():
with stack_context.ExceptionStackContext(catch_set_result_exc):
result.set('hello')
def get_callback(value):
outcomes['value'] = value
loop.add_callback(raise_callback)
def raise_callback():
assert False
def catch_set_result_exc(type, value, traceback):
outcomes['set_result_exc'] = type
return True
result.get(get_callback)
loop.add_timeout(time.time() + .01, set_result)
loop.start()
self.assertEqual(outcomes['value'], 'hello')
self.assertEqual(outcomes['set_result_exc'], None)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment