Skip to content

Instantly share code, notes, and snippets.

@hownowstephen
Last active December 3, 2020 23:53
Show Gist options
  • Save hownowstephen/5287376 to your computer and use it in GitHub Desktop.
Save hownowstephen/5287376 to your computer and use it in GitHub Desktop.
Overview of the gevent.queue module
import gevent.queue
queue = gevent.queue.Queue()
# Basic gevent.queue.Queue operations
# Put to the queue
# Good practice is to communicate across the queue using tuples
queue.put((0, 'message',))
# For capped queues, queue.put is a blocking command
# Use put_nowait, or set the parameters explicitly
queue.put_nowait((1,'message',))
queue.put((2, 'message', block=False, timeout=1))
# Blocking queue get
# (will wait until something is available in the queue to resume)
# Note the tuple unpacking, this allows us to assign variables
# directly (hence the use above of a tuple message)
priority, message = queue.get()
# Nonblocking queue get with timeout
# set timeout to None to immediately return. will return a value
# iff there is one available in the queue within the timeout
queue.get(block=False, timeout=1)
queue.get_nowait() # alias for queue.get(block=False)
# Peek at the queue
# Same as get, just doesn't remove values from the queue
queue.peek()
queue.peek(block=False, timeout=1)
queue.peek_nowait()
# Queues are iterable
for integer, message in queue:
print integer, message
## Types of queues and implementations
# Priority queueing
pq = gevent.queue.PriorityQueue()
# uses the awesome heapq python builtin to manage queuing priorities
# when using tuples, assume it will be ASC sorted based on tuple in
# increasing index order. Anything that can be compared will be used.
pq.put((1, datetime.now(), 'abc')) # result 2
pq.put((1, datetime.now() - timedelta(minutes=1), 'bcd')) # result 1
pq.put((1, datetime.now() 'bcd')) # result 3
for priority, date, message in pq:
print priority, date, message
# Joinable queuing
# Queues that are joinable can be set to join all of the messages
# in this queue have been consumed
jq = gevent.queue.JoinableQueue()
# Lifo queue
# Implements a very simple last-in, first-out queuing strategy
lq = gevent.queue.LifoQueue()
# Subclassing queues
import random
class ChaosQueue(gevent.queue.Queue):
'''A subclass of :class:`Queue` that retrieves a random queue entry'''
def _init(self, maxsize, items=None):
if items:
self.queue = list(items)
else:
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop(random.randint(0, len(self.queue)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment