Skip to content

Instantly share code, notes, and snippets.

@MaxMorais
Last active March 7, 2017 00:02
Show Gist options
  • Save MaxMorais/d111f3c4c74e053bdddfa9466548f7e6 to your computer and use it in GitHub Desktop.
Save MaxMorais/d111f3c4c74e053bdddfa9466548f7e6 to your computer and use it in GitHub Desktop.
#-*- coding: utf-8 -*-
"""RoundRobin is a Python library that allow to use Redis for a message queue"""
from functools import wraps
import multiprocessing
from redis import Redis
try:
import cPickle as pickle
except ImportError:
import pickle
def key_for_name(name):
"""Return used to store the given queue name in Redis"""
return "roundrobin:{}".format(name.lower())
class RoundRobinAgent(object):
"""Base class for all roundrobin subclasses"""
def __init__(self, name, serializer=pickle, **kwargs):
self.name = name
self.serializer = serializer
self._redis = Redis(**kwargs)
def __len__(self):
return self._redis.llen(self.key)
@property
def key(self):
"""Return the key name used to read this queue in Redis"""
if not hasattr(self, '__key'):
self.__key = key_for_name(self.name)
return self.__key
def clear(self):
"""Clear the queue of all messages, deleting the Redis key."""
self._redis.delete(self.key)
def worker(self, *args, **kwargs):
"""Decorator for using a function as a queue worker. Example:
>>> @consumer.worker(timeout=1)
... def printer(msgs):
... print msgs
>>> printer()
my message
another message
You can also use it without passing any keyword arguments:
>>> @consumer.worker
... def printer(msg):
... print msg
>>> printer()
my message
another message
:param async: if ``True`` will encapsulate the worker in a Thread to perform
parallelism. (defaults to ``False`` if not given)
:param kwargs: any arguments that :meth:`~roundrobin.RoundRobin.get` can
accept (:attr:`block` will default to ``True`` if not given)
"""
def decorator(worker):
@wraps(worker)
def wrapper(*args):
for msg in self.consume(**kwargs):
worker(*args + (msg,))
return wrapper
if args:
return decorator(*args)
return decorator
class Consumer(RoundRobinAgent):
"""Simple FIFO Queue consumer
>>> from roundrobin import Consumer
>>> queue = Consumer("myqueue", host="localhost", port=6739, db=0)
:param name: name of the queue.
:param serializer: The class or module to serialize msgs, with have
methods or functions named ``dumps`` and ``loads``,
`pickle <http://docs.python.org/library/pickle.html>`_ is the default,
use ``None`` to read messages in a plain text (suitable for strings,
integers, etc.)
:param kwargs: additional kwargs to pass to :class:`Redis`, most commonly
:attr:`host`, :attr:`port`, :attr:`db`
"""
def consume(self, **kwargs):
"""
Return a generator that yields whenever a message is waiting in the queue.
Will block otherwise. Example:
>> for msg in queue.consume(timeout=1):
... print msg
my message
another message
:param kwargs: any arguments that :meth:`~roundrobin.RoundRobin.get` can
accept (:attr:`block` will default to ``True`` if not given)
"""
kwargs.setdefault('block', True)
try:
while True:
msg = self.get(**kwargs)
if msg is None:
break
yield msg
except KeyboardInterrupt:
print; return
def get(self, block=False, timeout=None):
"""Return a message from the queue. Example:
>>> consumer.get()
'my message'
>>> consumer.get()
'another message'
:param block: whether or not to wait until a msg is available in the queue
before returning; ``False`` by default.
:param timeout: when using :attr:`block`, if no msg is available
for :attr:`timeout` in seconds, give up and return ``None``
"""
if block:
if timeout is None:
timeout = 0
msg = self._redis.blpop(self.key, timeout=timeout)
if msg is not None:
msg = msg[1]
else:
msg = self._redis.blpop(self.key)
if msg is not None and self.serializer is not None:
msg = self.serializer.loads(msg)
return msg
class Producer(RoundRobinAgent):
"""
Simple Redis Queue Producer
>>> from roundrobin import Producer
>>> producer = Producer(host="localhost", port=6739, db=0)
>>> producer.put("myqueue", "my message")
:param name: name of the queue.
:param serializer: The class or module to serialize msgs with, with have
methods or functions named ``dumps`` and ``loads``,
`pickle <http://docs.python.org/library/pickle.html>`_ is the default,
use ``None`` to store messages in a plain text (suitable for strings,
integers, etc).
:param kwargs: additional kwargs to pass to :class:`Redis`, most commonly
:attr:`host`, :attr:`port`, :attr:`db`
"""
def __init__(self, serializer=pickle, **kwargs):
super(Producer, self).__init__('', serializer, **kwargs)
def clear(self, queue):
"""Clear the queue of all messages, deleting the Redis key."""
key = key_for_name(queue)
self._redis.delete(key)
def put(self, queue, *msgs):
"""Put one or more messages onto the queue. Example:
>>> queue.put("myqueue", "my message")
>>> queue.put("myqueue", "another message")
To put messages onto the queue in bulk, which can be significantly
faster if you have a large number of messages:
>>> queue.put("myqueue", "my message", "another message", "third message")
"""
if self.serializer is not None:
msgs = map(self.serializer.dumps, msgs)
key = key_for_name(queue)
return self._redis.rpush(key, *msgs)
if __name__ == '__main__':
queues = ["queue{:02d}".format(i) for i in range(10)]
lqueue = len(queues)
def consume(name):
consumer = Consumer(name, host="localhost", port=13000)
@consumer.worker(timeout=20)
def printer(msg):
try:
print msg
except Exception as e:
producer.put('error', (msg, str(e))
printer()
producer = Producer(host='localhost', port=13000)
for i in range(100):
queue = queues[ i % lqueue ]
msg = 'Message {:03d}'.format(i)
print queue, producer.put(queue, msg)
for name in queues:
producer.put(name, None)
for name in queues:
p = multiprocessing.Process(target=consume, args=(name,))
p.start()
p.join()
for queue in queues:
producer.clear(queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment