Created
May 6, 2010 05:47
-
-
Save saltycrane/391821 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a simple Redis-based queue. Two features that I needed were | |
uniqueness (i.e. if an item exists in the queue already, it won't be | |
added again) and a delay like beanstalkd where an item must wait a | |
specified time before it can be popped from the queue. There are a | |
number of other Redis-based queues that have many more features but | |
I didn't see one that had these two features together. This 50-line | |
class works for my needs. It may or may not work for you. Feel free | |
to copy this and build on it. | |
Some links related to Redis queues: | |
http://github.com/defunkt/resque | |
http://github.com/samuel/dreque | |
http://github.com/gleicon/restmq | |
http://github.com/tnm/qr | |
http://github.com/blog/542-introducing-resque | |
http://nosql.mypopescu.com/post/366619997/usecase-restmq-a-redis-based-message-queue | |
http://nosql.mypopescu.com/post/426360602/redis-queues-an-emerging-usecase | |
""" | |
import sys | |
import time | |
import redis | |
REDIS_ADDRESS = '127.0.0.6' | |
class UniqueMessageQueueWithDelay(object): | |
"""A message queue based on the Redis sorted set data type. | |
Duplicate items in the queue are not allowed. | |
A delay may be specified when adding items to the queue. | |
Items will only be popped after the delay has passed. | |
Pop() is non-blocking, so polling must be used. | |
The name of the queue == the Redis key for the sorted set. | |
""" | |
def __init__(self, name): | |
self.name = name | |
self.redis = redis.Redis(REDIS_ADDRESS) | |
def add(self, data, delay=0): | |
"""Add an item to the queue. delay is in seconds. | |
""" | |
score = time.time() + delay | |
self.redis.zadd(self.name, data, score) | |
debug('Added %.1f, %s' % (score, data)) | |
def pop(self): | |
"""Pop one item from the front of the queue. | |
Items are popped only if the delay specified in the add() has passed. | |
Returns False if no items are available. | |
""" | |
min_score = 0 | |
max_score = time.time() | |
result = self.redis.zrangebyscore(self.name, min_score, max_score, | |
start=0, num=1, | |
withscores=False | |
) | |
if result == None: | |
return False | |
if len(result) == 1: | |
debug('Popped %s' % result[0]) | |
return result[0] | |
else: | |
return False | |
def remove(self, data): | |
return self.redis.zrem(self.name, data) | |
def debug(msg): | |
print msg | |
def test_queue(): | |
u = UniqueMessageQueueWithDelay('myqueue') | |
# add items to the queue | |
for i in [0, 1, 2, 3, 4, 0, 1]: | |
data = 'Item %d' % i | |
delay = 5 | |
u.add(data, delay) | |
time.sleep(0.1) | |
# get items from the queue | |
while True: | |
result = u.pop() | |
print result | |
if result != False: | |
u.remove(result) | |
time.sleep(1) | |
if __name__ == '__main__': | |
test_queue() | |
# TEST RUN: | |
# Added 1273125157.4, Item 0 | |
# Added 1273125157.5, Item 1 | |
# Added 1273125157.6, Item 2 | |
# Added 1273125157.7, Item 3 | |
# Added 1273125157.8, Item 4 | |
# Added 1273125157.9, Item 0 | |
# Added 1273125158.0, Item 1 | |
# False | |
# False | |
# False | |
# False | |
# False | |
# Popped Item 2 | |
# Item 2 | |
# Popped Item 3 | |
# Item 3 | |
# Popped Item 4 | |
# Item 4 | |
# Popped Item 0 | |
# Item 0 | |
# Popped Item 1 | |
# Item 1 | |
# False | |
# False | |
# False | |
# False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment