Skip to content

Instantly share code, notes, and snippets.

@semyont
Forked from demoray/mytasks.py
Created June 11, 2017 08:33
Show Gist options
  • Save semyont/fdcb04c53bcce6418f60cf9b8d3961ab to your computer and use it in GitHub Desktop.
Save semyont/fdcb04c53bcce6418f60cf9b8d3961ab to your computer and use it in GitHub Desktop.
A custom complete method for luigi tasks that will re-run the task if any prerequisite task is newer
class MyTask(luigi.Task):
def complete(self):
def _get_last(outputs):
last = 0.0
for item in outputs:
if not item.exists():
continue
current = os.path.getmtime(item.path)
if current > last:
last = current
return last
# If the task is not complete by normal means, say so
is_complete = super(MyTask, self).complete()
if not is_complete:
return False
# If the task has no prereqs, say so
required = luigi.task.flatten(self.requires())
if not required:
return True
outputs = luigi.task.flatten(self.output())
last_output = _get_last(outputs)
prereq_outputs = []
for task in required:
prereq_outputs += luigi.task.flatten(task.output())
last_prereq = _get_last(prereq_outputs)
# If any of of this tasks prereqs are older than the oldest output, then rerun
if last_prereq > last_output:
return False
# check prereq's prereqs
if not all([task.complete() for task in required]):
return False
return True
class C(MyTask):
size = luigi.Parameter()
def output(self):
return luigi.LocalTarget('/tmp/c-size-%s' % self.size)
def run(self):
with self.output().open('w') as fh:
fh.write('SIZE %s\n' % self.size)
class B(MyTask):
size = luigi.IntParameter()
def requires(self):
return C('%d' % self.size)
def output(self):
return luigi.LocalTarget('/tmp/b-size-%d' % self.size)
def run(self):
with self.input().open('r') as fh:
size = fh.read()
with self.output().open('w') as fh:
fh.write('SIZE %d = %s' % (self.size, size))
class A(MyTask):
def requires(self):
return B(1)
def output(self):
return luigi.LocalTarget('/tmp/output')
def run(self):
with self.input().open('r') as fh:
size = fh.read()
with self.output().open('w') as fh:
fh.write('I got: %s' % size)
$ PYTHONPATH=$PYTHONPATH:. luigi --module mytasks A 2> /dev/null
$ cat /tmp/output
I got: SIZE 1 = SIZE 1
$ cat /tmp/b-size-1
SIZE 1 = SIZE 1
$ cat /tmp/c-size-1
SIZE 1
$ echo wut > /tmp/b-size-1
$ PYTHONPATH=$PYTHONPATH:. luigi --module mytasks A 2> /dev/null
$ cat /tmp/output
I got: wut
$ cat /tmp/b-size-1
wut
$ cat /tmp/c-size-1
SIZE 1
$ echo hi mom > /tmp/c-size-1
$ PYTHONPATH=$PYTHONPATH:. luigi --module mytasks A 2> /dev/null
$ cat /tmp/output
I got: SIZE 1 = hi mom
$ cat /tmp/b-size-1
SIZE 1 = hi mom
$ cat /tmp/c-size-1
hi mom
$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment