Last active
February 27, 2019 21:26
-
-
Save ericgj/6491240e2c132c188f29f5f5ac8dd8a0 to your computer and use it in GitHub Desktop.
sequencing tasks in dramatiq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dramatiq | |
@dramatiq.actor | |
def sequence(environ, sequence, environ_key=None, result_key=None): | |
""" could be anything, just used to kick off a sequence """ | |
return { | |
u'environ': environ, | |
u'count': len(sequence) | |
} | |
""" | |
pass in environ, sequence, environ_key, and result_key | |
where sequence is a list of RunParams dicts (see middleware below) | |
""" | |
def trigger_sequence( **kwargs ): | |
sequence.send_with_options( args=(), kwargs=kwargs, **kwargs ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from dramatiq.middleware import Middleware | |
logger = logging.getLogger(__name__) | |
class Sequences(Middleware): | |
"""Middleware that lets you sequence Actors """ | |
@property | |
def actor_options(self): | |
return { | |
"sequence", | |
"environ", | |
"environ_key", | |
"result_key" | |
} | |
def after_process_message(self, broker, message, *, result=None, exception=None): | |
if not exception is None: | |
return | |
logger.info("Checking for next task in sequence") | |
seq = message.options.get(u"sequence", []) | |
environ = message.options.get(u"environ", {}) | |
if len(seq) < 1: | |
logger.info("No more tasks in sequence") | |
return | |
head, *tail = seq | |
next_run = RunParams.from_dict(head) | |
logger.info("Preparing environment for %s" % next_run.actor_name) | |
environ_key = message.options.get("environ_key") | |
result_key = message.options.get("result_key") | |
if not result_key is None: | |
environ = dict_insert(result_key, result, environ) | |
if not environ_key is None: | |
next_run = next_run.add_kwarg(environ_key, environ) | |
next_run = ( | |
next_run.add_options({ | |
u'sequence': tail, | |
u'environ': environ, | |
u'environ_key': environ_key, | |
u'result_key': result_key | |
}) | |
) | |
logger.info("Enqueuing %s" % next_run.actor_name) | |
actor = broker.get_actor(next_run.actor_name) | |
actor.send_with_options( | |
args=next_run.args, kwargs=next_run.kwargs, | |
delay=next_run.delay, **next_run.options | |
) | |
class RunParams: | |
""" | |
A way to bundle up all the state needed to call send_with_options | |
for the next actor in sequence | |
""" | |
@classmethod | |
def from_dict(cls, d): | |
return cls( | |
actor_name=d[u'actor_name'], | |
args=d[u'args'], | |
kwargs=d[u'kwargs'], | |
options=d[u'options'], | |
delay=d.get(u'delay') | |
) | |
def __init__(self, *, actor_name, args, kwargs, options, delay=None): | |
self.actor_name = actor_name | |
self.args = args | |
self.kwargs = kwargs | |
self.options = options | |
self.delay = delay | |
def to_dict(self): | |
return { | |
u'actor_name': self.actor_name, | |
u'args': self.args, | |
u'kwargs': self.kwargs, | |
u'options': self.options, | |
u'delay': self.delay | |
} | |
def add_kwarg(self, key, value): | |
return self.__class__( | |
actor_name=self.actor_name, | |
args=self.args, | |
kwargs=dict_insert(key, value, self.kwargs), | |
options=self.options, | |
delay=self.delay | |
) | |
def add_option(self, key, value): | |
return self.__class__( | |
actor_name=self.actor_name, | |
args=self.args, | |
kwargs=self.kwargs, | |
options=dict_insert(key, value, self.options), | |
delay=self.delay | |
) | |
def add_options(self, opts): | |
return self.__class__( | |
actor_name=self.actor_name, | |
args=self.args, | |
kwargs=self.kwargs, | |
options=dict_merge(opts, self.options), | |
delay=self.delay | |
) | |
def dict_insert(k,v,d): | |
d_ = d.copy() | |
d_.update({k:v}) | |
return d_ | |
def dict_merge(a,b): | |
b_ = b.copy() | |
b_.update(a) | |
return b_ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment