Skip to content

Instantly share code, notes, and snippets.

@yusuke0519
Created August 18, 2015 08:09
Show Gist options
  • Save yusuke0519/6ce6d133cf271b0a9e50 to your computer and use it in GitHub Desktop.
Save yusuke0519/6ce6d133cf271b0a9e50 to your computer and use it in GitHub Desktop.
[Luigi]My Workflows
import luigi
import luigi.scheduler
import luigi.worker
from components import *
class MyWorkflow(luigi.Task):
'''
An example luigi workflow task, which runs a whole workflow of other luigi
tasks when executed.
'''
# Here we define the whole workflow in the requires function
def requires(self):
task_a = TaskA()
task_b = TaskB(upstream_task=task_a)
task_c = TaskC(upstream_task=task_b)
return task_c
# Define a simple marker file to show that the workflow has completed
def output(self):
return luigi.LocalTarget(self.input().path + '.workflow_complete')
# Just write some text to the marker file
def run(self):
with self.output().open('w') as outfile:
outfile.write('Workflow complete!')
# If this file is executed as a script, let luigi take care of it:
if __name__ == '__main__':
luigi.run()
@yusuke0519
Copy link
Author

Dynamic Taskを必要とするタスクを含む場合に便利そう

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment