Call the All
class with a list of futures. Returns a future that can be iterated over, yielding each future item in the order it was added.
Inspired by jquery.When.
Call the All
class with a list of futures. Returns a future that can be iterated over, yielding each future item in the order it was added.
Inspired by jquery.When.
import time | |
from tornado.concurrent import Future | |
class All(Future): | |
'''Resolves when all requests are completed | |
''' | |
def __init__(self, futures, verbose=False): | |
super(All, self).__init__() | |
self.start_time = time.time() | |
self.end_time = None | |
self.futures = futures | |
self.ncompleted = 0 | |
self.verbose = verbose | |
for future in self.futures: | |
future.add_done_callback(self.done_callback) | |
def __iter__(self): | |
return AllIterator(self) | |
def done_callback(self, future): | |
self.ncompleted += 1 | |
if self.verbose: | |
print("Completed %s of %s" % (self.ncompleted, len(self.futures))) | |
if self.ncompleted == len(self.futures): | |
self.end_time = time.time() | |
if self.verbose: | |
print("TOTAL TIME: %s s" % (self.end_time - self.start_time)) | |
self.set_result(self) | |
class AllIterator(object): | |
"""Make results iterable | |
""" | |
def __init__(self, all_object): | |
self.all = all_object | |
self.current_index = 0 | |
def next(self): | |
if not self.all.done(): | |
raise Exception("Results not available yet. Wait until `.done()` returns `True`.") | |
elif self.current_index < len(self.all.futures): | |
citem = self.all.futures[self.current_index] | |
self.current_index += 1 | |
return citem | |
else: | |
raise StopIteration() |
from tornado_all import All | |
import tornado.gen | |
from tornado.httpclient import AsyncHTTPClient | |
from tornado.ioloop import IOLoop | |
import signal | |
@tornado.gen.coroutine | |
def test_endpoints(): | |
endpoints = [ | |
"http://echo.jsontest.com/animal/turtle/nlegs/4", | |
"http://echo.jsontest.com/animal/frog/nlegs/2" | |
] | |
c = AsyncHTTPClient() | |
jobs = yield All([c.fetch(endpoint) for endpoint in endpoints]) | |
for job in jobs: | |
try: | |
r = job.result() | |
print(r.code, r.body) | |
except HTTPError as e: | |
print(e.code, e.response) | |
# Stop after test | |
IOLoop.instance().stop() | |
def interrupt_handler(signal, frame): | |
IOLoop.instance().stop() | |
if __name__ == '__main__': | |
signal.signal(signal.SIGINT, interrupt_handler) | |
io_loop = IOLoop.instance() | |
io_loop.add_callback(test_endpoints) | |
IOLoop.instance().start() |