Skip to content

Instantly share code, notes, and snippets.



Created Jan 23, 2020
What would you like to do?
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from app.track_progress import TrackProgress
import dramatiq
redis_broker = RedisBroker(url="redis://localhost:6379")
backend = RedisBackend()
# Schedule task
task = my_task.send(user_id)
task_id = task.message_id
task_progress = backend.client.lindex(task_id, 0)
task_progress = backend.encoder.decode(task_progress) if task_progress else None
users_tasks = backend.client.lindex(user_id, 0)
users_tasks = backend.encoder.decode(users_tasks) if users_tasks else []
"""Dramatiq middleware for tracking task progress."""
from dramatiq.middleware import Middleware
class TrackProgress(Middleware):
"""Middleware for tracking progress."""
def __init__(self, *, backend=None, track_progress: bool = True, progress_ttl: int = 20):
self.backend = backend
self.track_progress = track_progress
self.progress_ttl = self.minutes_to_ms(progress_ttl)
def minutes_to_ms(self, minutes: int) -> int:
return minutes * 60 * 1000
def actor_options(self):
return {"track_progress", "progress_ttl"}
def store(self, broker, job_id, message, data):
"""Store given data."""
actor = broker.get_actor(message.actor_name)
track_progress = actor.options.get("track_progress", self.track_progress)
ttl = actor.options.get("progress_ttl", self.progress_ttl)
if track_progress:
self.backend._store(job_id, data, ttl) # noqa: SF01
def before_enqueue(self, broker, message, delay):
"""Save task id before a message is enqueued."""
user_id = message.args[0]
data = self.backend.client.lindex(user_id, 0)
if data is None:
data = []
data = self.backend.encoder.decode(data)
data.append(message.message_id), user_id, message, data)
def after_enqueue(self, broker, message, delay):
"""Save task when it's enqueued."""
data = {
"id": message.message_id,
"status": "enqueued",
"result": None,
}, message.message_id, message, data)
def before_process_message(self, broker, message):
"""Update task's status before its processed."""
data = {
"id": message.message_id,
"status": "started",
"result": None,
}, message.message_id, message, data)
def after_process_message(self, broker, message, *, result=None, exception=None):
"""Save task's results."""
if exception is None:
data = {
"id": message.message_id,
"status": "finished",
"result": str(result),
}, message.message_id, message, data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.