public
Last active

  • Download Gist
genmod.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
import functools
import sys
import types
 
from tornado.gen import (engine, YieldPoint, Multi, Task, Runner, Arguments,
LeakedCallbackError, BadYieldError)
from tornado.stack_context import ExceptionStackContext, wrap
 
 
def async(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
runner = None
 
def handle_exception(typ, value, tb):
if runner is not None:
return runner.handle_exception(typ, value, tb)
return False
 
# Extract callback, so it is not passed to the wrapped function
callback = None
if 'callback' in kwargs:
callback = wrap(kwargs.pop('callback'))
 
with ExceptionStackContext(handle_exception) as deactivate:
ret_value = None
 
# Execute function. If function raises StopIteration, it is same as if
# it returned something, but did not do anything asynchronously.
try:
gen = func(*args, **kwargs)
except StopIteration, ex:
if ex.args:
ret_value = ex.args
 
gen = None
except:
raise
 
if isinstance(gen, types.GeneratorType):
runner = AsyncRunner(gen, deactivate, callback)
runner.run()
return
 
# Not asynchronous, clean up, run callback
assert gen is None, gen
deactivate()
 
if callback is not None:
if ret_value:
callback(*ret_value[0], **ret_value[1])
else:
callback(None)
 
return wrapper
 
 
class AsyncRunner(Runner):
def __init__(self, gen, deactivate, callback):
super(AsyncRunner, self).__init__(gen, deactivate)
self._callback = callback
 
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
 
try:
self.running = True
 
while True:
if self.exc_info is None:
try:
if not self.yield_point.is_ready():
return
next = self.yield_point.get_result()
except Exception:
self.exc_info = sys.exc_info()
 
try:
if self.exc_info is not None:
self.had_exception = True
exc_info = self.exc_info
self.exc_info = None
yielded = self.gen.throw(*exc_info)
else:
yielded = self.gen.send(next)
except StopIteration, ex:
self.finished = True
 
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
 
self.deactivate_stack_context()
 
# Run callback if it was provided
if self._callback is not None:
if ex.args:
self._callback(*ex.args[0], **ex.args[1])
else:
self._callback(None)
 
return
except Exception:
self.finished = True
raise
if isinstance(yielded, list):
yielded = Multi(yielded)
if isinstance(yielded, YieldPoint):
self.yield_point = yielded
try:
self.yield_point.start(self)
except Exception:
self.exc_info = sys.exc_info()
else:
self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
finally:
self.running = False
 
 
def ret(*args, **kwargs):
raise StopIteration(args, kwargs)
test_genmod.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
from time import time
 
from tornado.ioloop import IOLoop
 
import genmod as gen
 
def eq_(a, b):
assert a == b
 
def test_ping():
@gen.async
def long_op(x, y):
yield gen.Task(io_loop.add_timeout, time() + 0.1)
gen.ret(x * y)
 
@gen.async
def sync_op(x, y):
gen.ret(x + y)
 
@gen.async
def sync_check(a):
if a == 10:
gen.ret(15)
 
@gen.async
def async_check(a):
yield gen.Task(io_loop.add_timeout, time() + 0.1)
if a == 10:
gen.ret(15)
 
gen.ret(20)
 
@gen.async
def raise_exc():
raise Exception('test')
 
@gen.async
def args(a, b):
gen.ret(a, b)
 
@gen.async
def kwargs(a, b):
yield gen.Task(io_loop.add_timeout, time() + 0.1)
 
gen.ret(a=a, b=b)
 
@gen.async
def proc():
# Make calls
a = yield gen.Task(long_op, 2, 3)
b = yield gen.Task(sync_op, a, 4)
eq_(b, 10)
 
# Check default return value (None)
res = yield gen.Task(sync_check, 10)
eq_(res, 15)
 
res = yield gen.Task(sync_check, 15)
eq_(res, None)
 
# Async check
res = yield gen.Task(async_check, 10)
eq_(res, 15)
 
res = yield gen.Task(async_check, 15)
eq_(res, 20)
 
# Exception check
try:
res = yield gen.Task(raise_exc)
except Exception, ex:
eq_(ex.message, 'test')
 
# Result tuple
res = yield gen.Task(args, 10, 20)
eq_(res.args, (10, 20))
 
# Named arguments
res = yield gen.Task(kwargs, 10, 20)
eq_(res.kwargs['a'], 10)
eq_(res.kwargs['b'], 20)
 
# Finish test
io_loop.stop()
 
io_loop = IOLoop.instance()
io_loop.add_callback(proc)
io_loop.start()
 
if __name__ == '__main__':
test_ping()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.