Skip to content

Instantly share code, notes, and snippets.

@leifdenby
Created March 29, 2021 09:35
Show Gist options
  • Save leifdenby/88d12536f0eb0a914c81cc8b3554eae6 to your computer and use it in GitHub Desktop.
Save leifdenby/88d12536f0eb0a914c81cc8b3554eae6 to your computer and use it in GitHub Desktop.
Consumer task with dynamic number of requirements
import luigi
def pack(cls, **kwargs):
return dict(class_name=cls.__name__, kwargs=kwargs)
def unpack(class_name, kwargs):
TaskClass = globals()[class_name]
return TaskClass(**kwargs)
class ConsumerTask(luigi.Task):
source_tasks = luigi.ListParameter()
def requires(self):
return [unpack(**task_kwargs) for task_kwargs in self.source_tasks]
def run(self):
import ipdb
ipdb.set_trace()
class ProviderTask(luigi.Task):
name = luigi.Parameter()
def run(self):
with open(self.output().fn, "w") as fh:
fh.write(f"42 {self.name}")
def output(self):
fn = f"data_{self.name}.txt"
return luigi.LocalTarget(fn)
class RunAll(luigi.WrapperTask):
def requires(self):
return ConsumerTask(
source_tasks=[
pack(ProviderTask, name="foo"),
pack(ProviderTask, name="bar"),
]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment