Skip to content

Instantly share code, notes, and snippets.

@ericgj
Last active February 27, 2019 21:26
Show Gist options
  • Save ericgj/6491240e2c132c188f29f5f5ac8dd8a0 to your computer and use it in GitHub Desktop.
Save ericgj/6491240e2c132c188f29f5f5ac8dd8a0 to your computer and use it in GitHub Desktop.
sequencing tasks in dramatiq
import dramatiq
@dramatiq.actor
def sequence(environ, sequence, environ_key=None, result_key=None):
""" could be anything, just used to kick off a sequence """
return {
u'environ': environ,
u'count': len(sequence)
}
"""
pass in environ, sequence, environ_key, and result_key
where sequence is a list of RunParams dicts (see middleware below)
"""
def trigger_sequence( **kwargs ):
sequence.send_with_options( args=(), kwargs=kwargs, **kwargs )
import logging
from dramatiq.middleware import Middleware
logger = logging.getLogger(__name__)
class Sequences(Middleware):
"""Middleware that lets you sequence Actors """
@property
def actor_options(self):
return {
"sequence",
"environ",
"environ_key",
"result_key"
}
def after_process_message(self, broker, message, *, result=None, exception=None):
if not exception is None:
return
logger.info("Checking for next task in sequence")
seq = message.options.get(u"sequence", [])
environ = message.options.get(u"environ", {})
if len(seq) < 1:
logger.info("No more tasks in sequence")
return
head, *tail = seq
next_run = RunParams.from_dict(head)
logger.info("Preparing environment for %s" % next_run.actor_name)
environ_key = message.options.get("environ_key")
result_key = message.options.get("result_key")
if not result_key is None:
environ = dict_insert(result_key, result, environ)
if not environ_key is None:
next_run = next_run.add_kwarg(environ_key, environ)
next_run = (
next_run.add_options({
u'sequence': tail,
u'environ': environ,
u'environ_key': environ_key,
u'result_key': result_key
})
)
logger.info("Enqueuing %s" % next_run.actor_name)
actor = broker.get_actor(next_run.actor_name)
actor.send_with_options(
args=next_run.args, kwargs=next_run.kwargs,
delay=next_run.delay, **next_run.options
)
class RunParams:
"""
A way to bundle up all the state needed to call send_with_options
for the next actor in sequence
"""
@classmethod
def from_dict(cls, d):
return cls(
actor_name=d[u'actor_name'],
args=d[u'args'],
kwargs=d[u'kwargs'],
options=d[u'options'],
delay=d.get(u'delay')
)
def __init__(self, *, actor_name, args, kwargs, options, delay=None):
self.actor_name = actor_name
self.args = args
self.kwargs = kwargs
self.options = options
self.delay = delay
def to_dict(self):
return {
u'actor_name': self.actor_name,
u'args': self.args,
u'kwargs': self.kwargs,
u'options': self.options,
u'delay': self.delay
}
def add_kwarg(self, key, value):
return self.__class__(
actor_name=self.actor_name,
args=self.args,
kwargs=dict_insert(key, value, self.kwargs),
options=self.options,
delay=self.delay
)
def add_option(self, key, value):
return self.__class__(
actor_name=self.actor_name,
args=self.args,
kwargs=self.kwargs,
options=dict_insert(key, value, self.options),
delay=self.delay
)
def add_options(self, opts):
return self.__class__(
actor_name=self.actor_name,
args=self.args,
kwargs=self.kwargs,
options=dict_merge(opts, self.options),
delay=self.delay
)
def dict_insert(k,v,d):
d_ = d.copy()
d_.update({k:v})
return d_
def dict_merge(a,b):
b_ = b.copy()
b_.update(a)
return b_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment