Skip to content

Instantly share code, notes, and snippets.

@saltycrane
Created May 6, 2010 05:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save saltycrane/391821 to your computer and use it in GitHub Desktop.
Save saltycrane/391821 to your computer and use it in GitHub Desktop.
"""
This is a simple Redis-based queue. Two features that I needed were
uniqueness (i.e. if an item exists in the queue already, it won't be
added again) and a delay like beanstalkd where an item must wait a
specified time before it can be popped from the queue. There are a
number of other Redis-based queues that have many more features but
I didn't see one that had these two features together. This 50-line
class works for my needs. It may or may not work for you. Feel free
to copy this and build on it.
Some links related to Redis queues:
http://github.com/defunkt/resque
http://github.com/samuel/dreque
http://github.com/gleicon/restmq
http://github.com/tnm/qr
http://github.com/blog/542-introducing-resque
http://nosql.mypopescu.com/post/366619997/usecase-restmq-a-redis-based-message-queue
http://nosql.mypopescu.com/post/426360602/redis-queues-an-emerging-usecase
"""
import sys
import time
import redis
REDIS_ADDRESS = '127.0.0.6'
class UniqueMessageQueueWithDelay(object):
"""A message queue based on the Redis sorted set data type.
Duplicate items in the queue are not allowed.
A delay may be specified when adding items to the queue.
Items will only be popped after the delay has passed.
Pop() is non-blocking, so polling must be used.
The name of the queue == the Redis key for the sorted set.
"""
def __init__(self, name):
self.name = name
self.redis = redis.Redis(REDIS_ADDRESS)
def add(self, data, delay=0):
"""Add an item to the queue. delay is in seconds.
"""
score = time.time() + delay
self.redis.zadd(self.name, data, score)
debug('Added %.1f, %s' % (score, data))
def pop(self):
"""Pop one item from the front of the queue.
Items are popped only if the delay specified in the add() has passed.
Returns False if no items are available.
"""
min_score = 0
max_score = time.time()
result = self.redis.zrangebyscore(self.name, min_score, max_score,
start=0, num=1,
withscores=False
)
if result == None:
return False
if len(result) == 1:
debug('Popped %s' % result[0])
return result[0]
else:
return False
def remove(self, data):
return self.redis.zrem(self.name, data)
def debug(msg):
print msg
def test_queue():
u = UniqueMessageQueueWithDelay('myqueue')
# add items to the queue
for i in [0, 1, 2, 3, 4, 0, 1]:
data = 'Item %d' % i
delay = 5
u.add(data, delay)
time.sleep(0.1)
# get items from the queue
while True:
print
result = u.pop()
print result
if result != False:
u.remove(result)
time.sleep(1)
if __name__ == '__main__':
test_queue()
# TEST RUN:
# Added 1273125157.4, Item 0
# Added 1273125157.5, Item 1
# Added 1273125157.6, Item 2
# Added 1273125157.7, Item 3
# Added 1273125157.8, Item 4
# Added 1273125157.9, Item 0
# Added 1273125158.0, Item 1
# False
# False
# False
# False
# False
# Popped Item 2
# Item 2
# Popped Item 3
# Item 3
# Popped Item 4
# Item 4
# Popped Item 0
# Item 0
# Popped Item 1
# Item 1
# False
# False
# False
# False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment