Skip to content

Instantly share code, notes, and snippets.

@edwardgeorge
Created September 30, 2012 16:58
Show Gist options
  • Save edwardgeorge/3807592 to your computer and use it in GitHub Desktop.
Save edwardgeorge/3807592 to your computer and use it in GitHub Desktop.
Consumer for blocking on multiple eventlet queues.
""" Consumer for blocking on multiple eventlet queues.
Ensures that you never take more items from the queues than just
the first item you receive, and only when explicitly waiting for
and item.
"""
import eventlet
from eventlet.event import Event
from eventlet.green import Queue
class MultiQueueConsumer(object):
def __init__(self, queues):
self.cancelled = False
self.event = Event()
self.queues = queues
class Waiter(object):
def __init__(self, consumer, queue):
self.consumer = consumer
self.queue = queue
@property
def cancelled(self):
return self.consumer.cancelled
def switch(self, item):
if self.cancelled or self.consumer.event.ready():
self.queue.queue.appendleft(item)
self.queue._schedule_unlock()
else:
self.consumer.event.send((self.queue, item))
def kill(self, *exc_info):
if not self.cancelled and not self.consumer.event.ready():
self.consumer.event.send(exc=exc_info)
def wait(self, timeout=None, return_queue=False):
empty_queues = []
for q in self.queues:
try:
if return_queue:
return q, q.get_nowait()
else:
return q.get_nowait()
except Queue.Empty:
empty_queues.append(q)
for q in empty_queues:
q.getters.add(self.Waiter(self, q))
self.cancelled = False
try:
with eventlet.Timeout(timeout, exception=Queue.Empty):
if return_queue:
return self.event.wait()
else:
return self.event.wait()[1]
finally:
self.cancelled = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment