Skip to content

Instantly share code, notes, and snippets.

@tomdottom
Created August 1, 2019 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomdottom/63f14433a739c646e3cb39dc9cd7525e to your computer and use it in GitHub Desktop.
Save tomdottom/63f14433a739c646e3cb39dc9cd7525e to your computer and use it in GitHub Desktop.
Distributed rate limited task queues.
"""demo.py
Distributed rate limited task queues.
Uses a leaky-bucket token-queue to ensure multiple workers don't exceed global rate limit.
# Setup & Dependencies
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
pip install kombu
# Term 1 & 2
python demo.py worker
# Term 3
python demo.py tokens
# Term 4
python demo.py scheduler
"""
from __future__ import absolute_import, unicode_literals
from functools import partial
from time import sleep
import datetime
import itertools
import sys
from kombu import Connection, Exchange, Queue
BROKER_URL = "amqp://localhost:5672//"
TASK_EXCHANGE = Exchange("tasks", type="direct")
TASK_QUEUES = {
# Leaky bucket. Can only be filled up to 10 tokens
"token-bucket": Queue(
"token-bucket", TASK_EXCHANGE, routing_key="token-bucket", max_length=10
),
"tasks": Queue("tasks", TASK_EXCHANGE, routing_key="tasks"),
}
def timestep(timeout=1):
while True:
yield datetime.datetime.now()
sleep(timeout)
def publish(queue, message):
message = queue.exchange.Message(body=message, content_encoding="utf-8")
return queue.exchange.publish(message=message, routing_key=queue.routing_key)
def init():
print("Initialising queues")
with Connection(BROKER_URL) as conn:
channel = conn.channel()
task_queue = TASK_QUEUES["tasks"](channel)
token_queue = TASK_QUEUES["token-bucket"](channel)
task_queue.declare()
token_queue.declare()
def worker():
print("Starting worker")
with Connection(BROKER_URL) as conn:
channel = conn.channel()
task_queue = TASK_QUEUES["tasks"](channel)
token_queue = TASK_QUEUES["token-bucket"](channel)
for dt in timestep(0.1):
token = token_queue.get()
if not token:
continue
task_message = task_queue.get()
if not task_message:
token.requeue()
continue
token.ack()
print("Received: {0}".format(task_message.payload))
task_message.ack()
def tokens():
print("Starting tokens generator")
with Connection(BROKER_URL) as conn:
channel = conn.channel()
token_queue = TASK_QUEUES["token-bucket"](channel)
token_put = partial(publish, token_queue)
for dt in timestep(1):
message = "Token created at {0}".format(dt)
foo = token_put(message)
print(message)
def scheduler(num=20):
print("Starting scheduler")
print(f"Sending {num} messages")
with Connection(BROKER_URL) as conn:
channel = conn.channel()
task_queue = TASK_QUEUES["tasks"](channel)
task_put = partial(publish, task_queue)
for dt in itertools.islice(timestep(0.05), 0, num):
message = "helloworld, sent at {0}".format(dt)
task_put(message)
print("Sent: {0}".format(message))
if __name__ == "__main__":
try:
component_name = sys.argv[1]
component = locals()[component_name]
init()
component()
except (IndexError, KeyError):
print(r"Start a worker, tokens generator, and scheduler in separate terminals")
print(r"python demo.py {worker,tokens,scheduler}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment