Created
October 12, 2012 03:52
-
-
Save ajdavis/3877210 to your computer and use it in GitHub Desktop.
Tornado AsyncResult proposal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import time | |
| from functools import partial | |
| from tornado import stack_context | |
| from tornado.ioloop import IOLoop | |
| class NotReady(Exception): | |
| pass | |
| class AlreadySet(Exception): | |
| pass | |
| def _check_callback(callback): | |
| """ | |
| Toro runs this on any callback before registering on IOLoop for future | |
| execution, to reduce confusion about the source of a TypeError. Note that | |
| calling a callback directly, or putting it in a _Waiter, don't require | |
| check_callback(). | |
| """ | |
| if not callable(callback): | |
| raise TypeError( | |
| "callback must be callable, not %s" % repr(callback)) | |
| class ToroBase(object): | |
| def __init__(self, io_loop): | |
| self.io_loop = io_loop or IOLoop.instance() | |
| def _run_callback(self, callback, *args, **kwargs): | |
| try: | |
| callback(*args, **kwargs) | |
| except Exception: | |
| self.handle_callback_exception(callback) | |
| def handle_callback_exception(self, callback): | |
| """This method is called whenever a callback run Toro throws an | |
| exception. | |
| By default simply logs the exception as an error. Subclasses | |
| may override this method to customize reporting of exceptions. | |
| The exception itself is not passed explicitly, but is available | |
| in sys.exc_info. | |
| Copied from IOLoop's implementation. | |
| """ | |
| logging.error("Exception in callback %r", callback, exc_info=True) | |
| class _Waiter(ToroBase): | |
| """Internal Toro utility class""" | |
| def __init__(self, timeout, timeout_args, io_loop, callback): | |
| """ | |
| Create a deferred callback. If timeout is not None, it's the number of | |
| seconds in the future at which to time-out this waiter. Note that | |
| timeout is seconds into the future, not a "deadline" in Unix timestamp | |
| as in Tornado. callback(*timeout_args) is executed after the timeout. | |
| Waiters are only ever executed once. | |
| """ | |
| super(_Waiter, self).__init__(io_loop) | |
| _check_callback(callback) | |
| if timeout is not None: | |
| self.io_loop.add_timeout( | |
| time.time() + timeout, partial(self.run, *timeout_args)) | |
| # Capture the current stack context so it's restored when we're run | |
| # after deferral. | |
| self.callback = stack_context.wrap(callback) | |
| def run(self, *args, **kwargs): | |
| if self.callback: | |
| callback, self.callback = self.callback, None | |
| # Clear the current stack context and run in the context that was | |
| # captured when we initialized. | |
| with stack_context.NullContext(): | |
| self._run_callback(callback, *args, **kwargs) | |
| @property | |
| def expired(self): | |
| return not self.callback | |
| class AsyncResult(ToroBase): | |
| """A one-time event that stores a value or an exception. | |
| Like :class:`Event` it wakes up all the waiters when :meth:`set` | |
| is called. Waiters may receive the passed value by calling :meth:`get` | |
| method instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be | |
| reset. | |
| To pass a value call :meth:`set`. Calls to :meth:`get` (those currently | |
| blocking as well as those made in the future) will return the value: | |
| >>> result = AsyncResult() | |
| >>> result.set(100) | |
| >>> result.get() | |
| 100 | |
| """ | |
| def __init__(self, io_loop=None): | |
| super(AsyncResult, self).__init__(io_loop) | |
| self._ready = False | |
| self.value = None | |
| self.waiters = [] | |
| def set(self, value): | |
| if self._ready: | |
| raise AlreadySet | |
| self.value = value | |
| self._ready = True | |
| waiters, self.waiters = self.waiters, [] | |
| for waiter in waiters: | |
| waiter.run(value) | |
| def ready(self): | |
| return self._ready | |
| def get(self, callback=None, timeout=None): | |
| if self.ready(): | |
| if not callback: | |
| # Non-blocking get | |
| return self.value | |
| _check_callback(callback) | |
| self.io_loop.add_callback(partial(callback, self.value)) | |
| else: | |
| if not callback: | |
| raise NotReady | |
| # After timeout, callback will be passed None | |
| self.waiters.append( | |
| _Waiter(timeout, (None,), self.io_loop, callback)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import unittest | |
| from tornado import stack_context, ioloop | |
| import async_result | |
| class TestAsyncResult(unittest.TestCase): | |
| def test_exc(self): | |
| # Test that raising an exception from a get() callback doesn't | |
| # propagate up to set()'s caller, and that StackContexts are correctly | |
| # managed | |
| result = async_result.AsyncResult() | |
| loop = ioloop.IOLoop.instance() | |
| loop.add_timeout(time.time() + .02, loop.stop) | |
| # Absent Python 3's nonlocal keyword, we need some place to store | |
| # results from inner functions | |
| outcomes = { | |
| 'value': None, | |
| 'set_result_exc': None, | |
| 'get_result_exc': None, | |
| } | |
| def set_result(): | |
| try: | |
| result.set('hello') | |
| except Exception, e: | |
| outcomes['set_result_exc'] = e | |
| def callback(value): | |
| outcomes['value'] = value | |
| assert False | |
| def catch_get_result_exception(type, value, traceback): | |
| outcomes['get_result_exc'] = type | |
| with stack_context.ExceptionStackContext(catch_get_result_exception): | |
| result.get(callback) | |
| loop.add_timeout(time.time() + .01, set_result) | |
| loop.start() | |
| self.assertEqual(outcomes['value'], 'hello') | |
| self.assertEqual(outcomes['get_result_exc'], AssertionError) | |
| self.assertEqual(outcomes['set_result_exc'], None) | |
| def test_stack_context_leak(self): | |
| # Test that stack context isn't leaked from set() to a deferred get(). | |
| # This is different from test_exc -- test_exc tests that _Waiter calls | |
| # stack_context.wrap() so a stack context we set when we scheduled the | |
| # get() is restored when get's callback is run. Here, we test that if | |
| # get()'s callback schedules another callback that raises an exception, | |
| # that exception isn't caught by set()'s stack context. | |
| result = async_result.AsyncResult() | |
| loop = ioloop.IOLoop.instance() | |
| loop.add_timeout(time.time() + .02, loop.stop) | |
| # Absent Python 3's nonlocal keyword, we need some place to store | |
| # results from inner functions | |
| outcomes = { | |
| 'value': None, | |
| 'set_result_exc': None, | |
| } | |
| def set_result(): | |
| with stack_context.ExceptionStackContext(catch_set_result_exc): | |
| result.set('hello') | |
| def get_callback(value): | |
| outcomes['value'] = value | |
| loop.add_callback(raise_callback) | |
| def raise_callback(): | |
| assert False | |
| def catch_set_result_exc(type, value, traceback): | |
| outcomes['set_result_exc'] = type | |
| return True | |
| result.get(get_callback) | |
| loop.add_timeout(time.time() + .01, set_result) | |
| loop.start() | |
| self.assertEqual(outcomes['value'], 'hello') | |
| self.assertEqual(outcomes['set_result_exc'], None) | |
| if __name__ == '__main__': | |
| unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment