Skip to content

Instantly share code, notes, and snippets.

@jdunck
Created November 17, 2012 08:16
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save jdunck/4094242 to your computer and use it in GitHub Desktop.
Save jdunck/4094242 to your computer and use it in GitHub Desktop.
leaky bucket queue - redis 2.6 + lua + python
#cribbed from http://vimeo.com/52569901 (Twilio carrier call origination moderation)
# The idea is that many fan-in queues can enqueue at any rate, but
# dequeue needs to happen in a rate-controlled manner without allowing
# any individual input queue to starve other queues.
# http://en.wikipedia.org/wiki/Leaky_bucket (second sense, "This version is referred to here as the leaky bucket as a queue.")
#
# requires:
# redis 2.6+
# redis-py>=2.7.0
# anyjson
from uuid import uuid4
from time import time
from anyjson import loads, dumps
import redis
# this is obviously dumb - change to take a param
# but it's good for single-file exposition.
c = redis.Redis()
class QueueEmpty(Exception):
pass
def define_rate_limits(arena, limits):
"""
{ queue_name -> allowed calls per second }
"""
c.hmset('rate_limits', dict([(("%s:%s" % (arena, k)), v) for k, v in limits.items()]))
def get_time():
return int(time() * 1000)
def enqueue(arena, queue, payload):
enqueue_script(args=[arena, queue, dumps(payload)])
def dequeue(arena, reservation_seconds=10):
txnid = uuid4().get_hex()
item = dequeue_script(args=[arena, get_time(), txnid, reservation_seconds])
if item is None:
raise QueueEmpty()
else:
item = loads(item)
return item, txnid
def commit(txnid):
return commit_script(args=[txnid])
def reap():
return reap_script(args=[get_time()])
enqueue_script = c.register_script("""
-- e.g. evalsha <sha> 0 arena_namespace some_queue payload
if #ARGV < 3 then
return error("USAGE: arena queue payload")
end
local arena_name = ARGV[1]
local queue_name = ARGV[2]
local payload = ARGV[3]
local rate_queues = arena_name .. ":queues"
local queue_name = arena_name .. ":" .. queue_name
-- schedule the job.
local queue_length = redis.call('RPUSH', queue_name, payload)
if queue_length == 1 then
--new queue, so add to rate list with immediate start time.
redis.call('ZADD', rate_queues, 0, queue_name)
end
return queue_length
""")
dequeue_script = c.register_script("""
-- e.g. evalsha <sha> 0 arena_namespace, time uuid() 10
if #ARGV < 4 then
return error("USAGE: arena current_time txnid reservation_duration")
end
local arena_name = ARGV[1]
local current_time = ARGV[2]
local txnid = ARGV[3]
local reservation_duration = ARGV[4]
local rate_queues = arena_name .. ":queues"
while true do -- loop until there are no ready queues or we find an unempty queue.
-- pick a queue with a read time from the dawn of time until now, but only one queue.
local queue_names = redis.call("ZRANGEBYSCORE", rate_queues, 0, current_time, "LIMIT", 0, 1)
if #queue_names == 0 then
-- no queue is allowed to run yet.
return false
end
local queue_name = queue_names[1]
local item = redis.call("LPOP", queue_name)
if item == false then
--the queue was empty; remove it from the managed queues.
redis.call("ZREM", rate_queues, queue_name)
else
-- rate_limits is an assumed hash with calls allowed per queue per second.
-- for sharded throughput, set it to overall_limit / #shards
local rate_delay = (1000 / redis.call("HGET", "rate_limits", queue_name))
-- reschedule the next check of this queue
redis.call("ZADD", rate_queues, current_time + rate_delay, queue_name)
-- add the item to in-flight transactions
redis.call("HMSET", "txn:" .. txnid, "data", item, "queue_name", queue_name)
-- and schedule transaction reaping if not committed in time.
redis.call("ZADD", "transactions", current_time + reservation_duration, txnid)
return item
end
end
""")
commit_script = c.register_script("""
-- e.g. evalsha <sha> 0 previous_uuid
if #ARGV < 1 then
return error("USAGE: txnid")
end
local txnid = ARGV[1]
redis.call("DEL", "txn:" .. txnid)
return redis.call("ZREM", "transactions", txnid)
""")
reap_script = c.register_script("""
-- e.g. evalsha <sha> time.time()
if #ARGV < 1 then
return error("USAGE: current_time")
end
local time = ARGV[1]
local txns = "transactions"
-- find expired transactions
local expired_txns = redis.call("ZRANGEBYSCORE", txns, 0, time)
for i, txnid in ipairs(expired_txns) do
local item_key = "txn:" .. txnid
local data, queue_name = unpack(redis.call("HMGET", item_key, "data", "queue_name"))
--add the txn back to the input queue
redis.call("LPUSH", queue_name, data)
-- remove the in-flight transaction
redis.call("DEL", item_key)
end
--remove the expired transactions from the reaping set.
return redis.call("ZREMRANGEBYSCORE", txns, 0, time)
""")
import redis_leaky_bucket
# crawl github at most 10/s, wikipedia 5/s
redis_leaky_bucket.define_rate_limits('site_crawl', {'github.com': 10, 'wikipedia.org': 5})
redis_leaky_bucket.enqueue('site_crawl', 'wikipedia.org', "http://www.wikipedia.org")
redis_leaky_bucket.enqueue('site_crawl', 'github.com', "http://www.github.com")
# as many workers as you want.
import time, requests, lxml.html, urlparse
import redis_leaky_bucket
interesting_domains = set(['github.com', 'www.wikipedia.org'])
# in each worker; as many workers as you want:
while True:
try:
url, txnid = redis_leaky_bucket.dequeue('site_crawl', reservation_seconds=30)
except redis_leaky_bucket.QueueEmpty:
print "nothing to do, sleeping."
time.sleep(1)
continue
response = requests.get(url)
if response.status_code != 200:
redis_leaky_bucket.commit(txnid)
continue
tree = lxml.html.fromstring(response.content)
tree.make_links_absolute(url)
for elem, attr, val, _ in tree.iterlinks():
if not (elem.tag == 'a' and attr == 'href'):
continue
domain_parts = urlparse.urlparse(val).netloc.split('.')
if domain_parts[0] == 'www':
domain_parts = domain_parts[1:]
domain = ".".join(domain_parts)
if domain in interesting_domains:
# really, you'd want a bloom filter or something to limit recrawls.
# see https://github.com/seomoz/pyreBloom
print "enqueue %s" % val
redis_leaky_bucket.enqueue('site_crawl', domain, val)
else:
print "skipping %s" % domain
redis_leaky_bucket.commit(txnid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment