Skip to content

Instantly share code, notes, and snippets.

@turtlemonvh
Last active August 31, 2015 15:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save turtlemonvh/28465b95e5893833bdd5 to your computer and use it in GitHub Desktop.
Save turtlemonvh/28465b95e5893833bdd5 to your computer and use it in GitHub Desktop.
Tornado All

Usage

Call the All class with a list of futures. Returns a future that can be iterated over, yielding each future item in the order it was added.

Inspired by jquery.When.

import time
from tornado.concurrent import Future
class All(Future):
'''Resolves when all requests are completed
'''
def __init__(self, futures, verbose=False):
super(All, self).__init__()
self.start_time = time.time()
self.end_time = None
self.futures = futures
self.ncompleted = 0
self.verbose = verbose
for future in self.futures:
future.add_done_callback(self.done_callback)
def __iter__(self):
return AllIterator(self)
def done_callback(self, future):
self.ncompleted += 1
if self.verbose:
print("Completed %s of %s" % (self.ncompleted, len(self.futures)))
if self.ncompleted == len(self.futures):
self.end_time = time.time()
if self.verbose:
print("TOTAL TIME: %s s" % (self.end_time - self.start_time))
self.set_result(self)
class AllIterator(object):
"""Make results iterable
"""
def __init__(self, all_object):
self.all = all_object
self.current_index = 0
def next(self):
if not self.all.done():
raise Exception("Results not available yet. Wait until `.done()` returns `True`.")
elif self.current_index < len(self.all.futures):
citem = self.all.futures[self.current_index]
self.current_index += 1
return citem
else:
raise StopIteration()
from tornado_all import All
import tornado.gen
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
import signal
@tornado.gen.coroutine
def test_endpoints():
endpoints = [
"http://echo.jsontest.com/animal/turtle/nlegs/4",
"http://echo.jsontest.com/animal/frog/nlegs/2"
]
c = AsyncHTTPClient()
jobs = yield All([c.fetch(endpoint) for endpoint in endpoints])
for job in jobs:
try:
r = job.result()
print(r.code, r.body)
except HTTPError as e:
print(e.code, e.response)
# Stop after test
IOLoop.instance().stop()
def interrupt_handler(signal, frame):
IOLoop.instance().stop()
if __name__ == '__main__':
signal.signal(signal.SIGINT, interrupt_handler)
io_loop = IOLoop.instance()
io_loop.add_callback(test_endpoints)
IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment