Skip to content

Instantly share code, notes, and snippets.

@dangarthwaite
Last active October 21, 2015 18:36
Show Gist options
  • Save dangarthwaite/8ec2413fb1af3e480902 to your computer and use it in GitHub Desktop.
Save dangarthwaite/8ec2413fb1af3e480902 to your computer and use it in GitHub Desktop.
Skeleton luigi example.
#!/usr/bin/env python
# encoding: utf-8
import luigi
import os
class Validate(luigi.Task):
region = luigi.Parameter()
def run(self):
with self.output().open('w') as out:
print "Validate {}".format(self.region)
out.write(self.region)
def output(self):
return luigi.LocalTarget('/tmp/data/{}/validate.log'.format(self.region))
class ImportGroups(luigi.Task):
region = luigi.Parameter()
def requires(self):
return Validate(self.region)
def run(self):
with self.output().open('w') as out:
out.write("Importing groups: {}".format(self.region))
def output(self):
return luigi.LocalTarget('/tmp/data/{}/groups.log'.format(self.region))
class ImportUsers(luigi.Task):
region = luigi.Parameter()
def requires(self):
return ImportGroups(self.region)
def run(self):
with self.output().open('w') as out:
out.write("Importing users: {}".format(self.region))
def output(self):
return luigi.LocalTarget('/tmp/data/{}/users.log'.format(self.region))
class Fetch(luigi.Task):
def run(self):
try: os.mkdir('/tmp/data', 0770)
except OSError: pass
with self.output().open('w') as out:
for region in {x for x in os.listdir('/tmp/incoming') if x.endswith('_users.csv'}:
# Code here to transfer files to /tmp/data
print >>out, region
def output(self):
return luigi.LocalTarget('/tmp/data/regions.log')
class ImportRegions(luigi.Task):
def requires(self):
return Fetch()
def run(self):
with self.output().open('w') as out:
for region in self.input().open('r').readlines():
region = region.strip()
print >>out, region
yield ImportUsers(region)
def output(self):
return luigi.LocalTarget('/tmp/data/import.log')
if __name__ == '__main__':
luigi.run(main_task_cls=ImportRegions, local_scheduler=True)
@dangarthwaite
Copy link
Author

If it isn't obvious - all the def run(self): methods need actual business logic in them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment