Skip to content

Instantly share code, notes, and snippets.

@miohtama
Last active November 6, 2021 01:47
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save miohtama/6b455958cc88a873e8b659346c71a23a to your computer and use it in GitHub Desktop.
Throttling background task queue using Dramatiq - wait until more workers are freed
import time
from typing import Dict, Optional
from dramatiq import Message, Broker
from dramatiq.results import ResultMissing
class TaskQueue:
"""A task queue using Dramatiq background task framework.
https://dramatiq.io/
The purpose this queue is to feed tasks to the Dramatiq workers
as fast as they can handle. The queue should be set to the same size
as the number of total worker threads available.
This is useful for running task sets that are CPU bounded:
you know the number of CPUs and the load capacity of a server and
you do not want to exceed this capacity.
When the queue is fully allocated,
the queue will block until existing tasks are complete and there is more free space.
An incomplete usage example:
.. code-block:: python
# Setup Dramatiq
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
redis_result_backend = RedisBackend(client=_client)
redis_broker = RedisBroker(client=_client)
redis_broker.add_middleware(Results(backend=redis_result_backend))
dramatiq.set_broker(redis_broker)
@dramatiq.actor
def long_process(id: int) -> int:
'''Example blocking background worker task'''
r = random.randint(5)
print(f"Background process {id} sleeping {r} seconds")
time.sleep(r)
return r
# Assume we have started dramatiq worker controller with 2 processes, 5 threads
queue = TaskQueue(redis_broker, 10)
# We are going to generate more messages than we have workers,
# so after 10 messages the loop starts to slow down,
# while the queue is blocking until more background workers
# are cleared up
total_slept = 0
for i in range(100):
message = long_process.message(i)
results = queue.add_and_wait_until_space(message)
for slept_time in results:
total_slepts += slept_time
print("All background workers slept {total_slept} seconds")
"""
def __init__(self, broker: Broker, size: int, poll_delay=0.5):
self.broker = broker
self.tasks: Dict[int, Optional[Message]] = {}
for i in range(size):
self.tasks[i] = None
self.poll_delay = poll_delay
def add_and_wait_until_space(self, m: Message) -> list:
"""Add a new task.
Sends a new task to the background workers as soon as any background worker is finished
with a previous task. If all background workers are busy then block.
:return: List of results of previous tasks that have completed
"""
ready = []
while (free_slot := self.peak_free_slot()) is None:
time.sleep(self.poll_delay)
ready += self.scoop_finished()
# Assing task to the queue
self.tasks[free_slot] = m
# Send task to the worker pool
self.broker.enqueue(m)
return ready
def peak_free_slot(self) -> Optional[int]:
"""Get the first free slot in the queue (if any)"""
for slot, message in self.tasks.items():
if message is None:
return slot
return None
def scoop_finished(self) -> list:
"""Find finished tasks and make space in the queue.
:return: Results of completed tasks
"""
ready = {}
message: Message
for slot, message in self.tasks.items():
try:
result = message.get_result()
except ResultMissing:
continue
ready[slot] = result
# Clean finished
for ready_slot in ready.keys():
self.tasks[ready_slot] = None
return list(ready.values())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment