Created
March 19, 2018 19:48
-
-
Save dlstadther/a76d4b597c8154019ea08aa036df8a18 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Test pipeline to test out complex dependency building | |
http://bionics.it/posts/luigi-tutorial | |
Concept breaks down when dynamic dependencies are introduced - ListParameters of Tasks cannot be serialized currently | |
# Note to self: | |
execute via `~/venv/luigi-3/bin/python3 test_dep.py --local-scheduler TaskA | |
""" | |
import datetime | |
import luigi | |
__author__ = 'Dillon Stadther' | |
__date__ = '2017-06-09' | |
class path(luigi.Config): | |
tmp_path = luigi.Parameter() | |
class TaskA(luigi.WrapperTask): | |
"""Wrapper for all execution""" | |
date = luigi.DateHourParameter(default=datetime.datetime.utcnow()) | |
param_a = 'task_a' | |
param_b = 'task_b' | |
param_c = 'task_c' | |
param_d = 'task_d' | |
param_e = 'task_e' | |
param_f = 'task_f' | |
def requires(self): | |
# A will require B and C | |
# B will require D | |
# C will require D and E | |
# D will require nothing | |
# E will require nothing | |
task_f = TaskF( | |
param='{}{}'.format(path().tmp_path, self.param_f), | |
upstream_task=[None] | |
) | |
task_e = TaskB( | |
param='{}{}'.format(path().tmp_path, self.param_e), | |
upstream_task=[None] | |
) | |
task_d = TaskB( | |
param='{}{}'.format(path().tmp_path, self.param_d), | |
upstream_task=[None] | |
) | |
task_c = TaskB( | |
param='{}{}'.format(path().tmp_path, self.param_c), | |
upstream_task=[task_d, task_e] | |
) | |
task_b = TaskB( | |
param='{}{}'.format(path().tmp_path, self.param_b), | |
upstream_task=[task_d] | |
) | |
yield [task_f, task_c, task_b] | |
class TaskB(luigi.Task): | |
"""Basic Task to output blank file""" | |
param = luigi.Parameter() | |
upstream_task = luigi.Parameter() | |
def requires(self): | |
return self.upstream_task | |
def output(self): | |
return luigi.LocalTarget(self.param) | |
def run(self): | |
self.output().open('w').close() | |
class TaskF(TaskB): | |
"""Example Task for Inheritance, overriding only the run method""" | |
def run(self): | |
with open(self.output().path, 'w') as outfile: | |
outfile.write('inheritance') | |
if __name__ == '__main__': | |
luigi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment