Skip to content

Instantly share code, notes, and snippets.

@rredkovich
Created May 27, 2018 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rredkovich/3cfac783428a6293480bb0ac73939db0 to your computer and use it in GitHub Desktop.
Save rredkovich/3cfac783428a6293480bb0ac73939db0 to your computer and use it in GitHub Desktop.
python tornado queue task consuming in parallel
import time
import logging
from tornado import queues, gen, ioloop
from tornado.options import parse_command_line
QUEUE_SIZE = 4
CONCURRENT = 2
class MongoQueue(object):
def __init__(self):
super(MongoQueue, self).__init__()
self.queue = queues.Queue(maxsize=QUEUE_SIZE)
self.running = True
self.counter = 0
@gen.coroutine
def load_work(self):
# read mongo collection and for each document create queue entry
self.counter += 1
logging.info('producing {0} queue size: {1}'.format(self.counter, self.queue.qsize()))
time.sleep(1)
yield self.queue.put(self.counter)
@gen.coroutine
def process_job(self, job):
# long processing - could be run in parallel with other jobs
time.sleep(2)
logging.info('consuming {0} queue size: {1}'.format(job, self.queue.qsize()))
@gen.coroutine
def worker(self):
while self.running:
job = yield self.queue.get()
try:
yield self.process_job(job)
except Exception, ex:
logging.exception(ex)
finally:
self.queue.task_done()
@gen.coroutine
def workers(self):
ioloop.IOLoop.current().spawn_callback(self.producer)
futures = [self.worker() for _ in range(CONCURRENT)]
yield futures
@gen.coroutine
def producer(self):
while self.running:
yield self.load_work()
if __name__ == '__main__':
parse_command_line()
logging.getLogger().setLevel(logging.DEBUG)
mongoQueue = MongoQueue()
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(mongoQueue.workers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment