Skip to content

Instantly share code, notes, and snippets.

@dlstadther
Created March 19, 2018 19:48
Show Gist options
  • Save dlstadther/a76d4b597c8154019ea08aa036df8a18 to your computer and use it in GitHub Desktop.
Save dlstadther/a76d4b597c8154019ea08aa036df8a18 to your computer and use it in GitHub Desktop.
"""
Test pipeline to test out complex dependency building
http://bionics.it/posts/luigi-tutorial
Concept breaks down when dynamic dependencies are introduced - ListParameters of Tasks cannot be serialized currently
# Note to self:
execute via `~/venv/luigi-3/bin/python3 test_dep.py --local-scheduler TaskA
"""
import datetime
import luigi
__author__ = 'Dillon Stadther'
__date__ = '2017-06-09'
class path(luigi.Config):
tmp_path = luigi.Parameter()
class TaskA(luigi.WrapperTask):
"""Wrapper for all execution"""
date = luigi.DateHourParameter(default=datetime.datetime.utcnow())
param_a = 'task_a'
param_b = 'task_b'
param_c = 'task_c'
param_d = 'task_d'
param_e = 'task_e'
param_f = 'task_f'
def requires(self):
# A will require B and C
# B will require D
# C will require D and E
# D will require nothing
# E will require nothing
task_f = TaskF(
param='{}{}'.format(path().tmp_path, self.param_f),
upstream_task=[None]
)
task_e = TaskB(
param='{}{}'.format(path().tmp_path, self.param_e),
upstream_task=[None]
)
task_d = TaskB(
param='{}{}'.format(path().tmp_path, self.param_d),
upstream_task=[None]
)
task_c = TaskB(
param='{}{}'.format(path().tmp_path, self.param_c),
upstream_task=[task_d, task_e]
)
task_b = TaskB(
param='{}{}'.format(path().tmp_path, self.param_b),
upstream_task=[task_d]
)
yield [task_f, task_c, task_b]
class TaskB(luigi.Task):
"""Basic Task to output blank file"""
param = luigi.Parameter()
upstream_task = luigi.Parameter()
def requires(self):
return self.upstream_task
def output(self):
return luigi.LocalTarget(self.param)
def run(self):
self.output().open('w').close()
class TaskF(TaskB):
"""Example Task for Inheritance, overriding only the run method"""
def run(self):
with open(self.output().path, 'w') as outfile:
outfile.write('inheritance')
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment