Skip to content

Instantly share code, notes, and snippets.

@mrsiano
Created August 6, 2020 09:02
Show Gist options
  • Save mrsiano/f1d5153bcf4fbc3acb6946f455b9935e to your computer and use it in GitHub Desktop.
Save mrsiano/f1d5153bcf4fbc3acb6946f455b9935e to your computer and use it in GitHub Desktop.
simple-dag.py
#!/usr/bin/env python
import logging
import time
import luigi
logger = logging.getLogger(__name__)
class Init(luigi.Task):
_complete = False
def output(self):
return luigi.LocalTarget('./data/Inti/init')
def run(self):
time.sleep(30)
with self.output().open('w') as f:
f.write('ok')
class DailyUpdates(luigi.Task):
id = luigi.Parameter()
def requires(self):
return Init()
def output(self):
return luigi.LocalTarget(path='./data/DailyUpdates/%s' % self.id)
def run(self):
time.sleep(1)
with self.output().open('w') as f:
f.write('ok')
class Learning1(luigi.Task):
id = luigi.Parameter()
def requires(self):
return DailyUpdates(id=self.id)
def output(self):
return luigi.LocalTarget(path=f'./data/Learning1/{self.id}.csv')
def run(self):
time.sleep(1)
with self.output().open('w') as f:
f.write('ok')
class Predict(luigi.Task):
id = luigi.Parameter()
def requires(self):
yield Learning1(id=self.id)
def output(self):
return luigi.LocalTarget(path=f'./data/Predict/{self.id}.csv')
def run(self):
time.sleep(1)
with self.output().open('w') as f:
f.write('ok')
class Main(luigi.Task):
def requires(self):
for id in range(3):
yield Predict(id=id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment