Skip to content

Instantly share code, notes, and snippets.

@hoffrocket
Forked from miku/README.md
Last active December 27, 2015 12:49
Show Gist options
  • Save hoffrocket/7328286 to your computer and use it in GitHub Desktop.
Save hoffrocket/7328286 to your computer and use it in GitHub Desktop.

Example outputs:

$ python dynamic.py DynamicRequirements --a 4 --b 8

DEBUG: Checking if DynamicRequirements(a=4, b=8) is complete
INFO: Scheduled DynamicRequirements(a=4, b=8)
DEBUG: Checking if SomeTask(number=4) is complete
INFO: Scheduled SomeTask(number=4)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 21375] Running   SomeTask(number=4)
INFO: [pid 21375] Done      SomeTask(number=4)
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21375] Running   DynamicRequirements(a=4, b=8)

4 x 8 = 32

INFO: [pid 21375] Done      DynamicRequirements(a=4, b=8)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

And the super dynamic task:

$ python dynamic.py SuperDynamicRequirements --a 4

DEBUG: Checking if SuperDynamicRequirements(a=4) is complete
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
INFO: Scheduled SuperDynamicRequirements(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21439] Running   SuperDynamicRequirements(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

4 x 10 = 40

INFO: [pid 21439] Done      SuperDynamicRequirements(a=4)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

Output of original with unfufilled requirement on RandomNumber:

$ python dynamic.py --local-scheduler SuperDynamicRequirements --a 4
DEBUG: Checking if SuperDynamicRequirements(a=4) is complete
DEBUG: Checking if RandomNumber() is complete
INFO: Scheduled RandomNumber()
DEBUG: Checking if PathTask(path=./random-ready.txt) is complete
WARNING: Task PathTask(path=./random-ready.txt) is not complete and run() is not implemented. Probably a missing external dependency.
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 1 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread
ERROR: Luigi unexpected framework error while scheduling SuperDynamicRequirements(a=4)
Traceback (most recent call last):
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/worker.py", line 143, in add
    for next in self._add(current):
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/worker.py", line 198, in _add_task_and_deps
    deps = task.deps()
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/task.py", line 327, in deps
    return flatten(self._requires())
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/task.py", line 320, in _requires
    return flatten(self.requires())  # base impl
  File "dynamic.py", line 84, in requires
    n = int(prerequisite.output().open().read().strip())
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/file.py", line 99, in open
    return open(self.path, mode)
IOError: [Errno 2] No such file or directory: './random.txt'
INFO: Sending warning email to None
DEBUG: Emailing:
-------------
To: (None,)
From: luigi-client@1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.ip6.arpa
Subject: Luigi: Framework error while scheduling SuperDynamicRequirements(a=4)
Message:
Luigi framework error:
Traceback (most recent call last):
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/worker.py", line 143, in add
    for next in self._add(current):
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/worker.py", line 198, in _add_task_and_deps
    deps = task.deps()
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/task.py", line 327, in deps
    return flatten(self._requires())
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/task.py", line 320, in _requires
    return flatten(self.requires())  # base impl
  File "dynamic.py", line 84, in requires
    n = int(prerequisite.output().open().read().strip())
  File "/Users/jon/.virtualenvs/luigideptest/lib/python2.7/site-packages/luigi/file.py", line 99, in open
    return open(self.path, mode)
IOError: [Errno 2] No such file or directory: './random.txt'

-------------
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

Output with dynamic requirements in run method:

$ python dynamic.py --local-scheduler SuperDynamicRequirements2 --a 4
DEBUG: Checking if SuperDynamicRequirements2(a=4) is complete
INFO: Scheduled SuperDynamicRequirements2(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Scheduled RandomNumber()
DEBUG: Checking if PathTask(path=./random-ready.txt) is complete
WARNING: Task PathTask(path=./random-ready.txt) is not complete and run() is not implemented. Probably a missing external dependency.
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: There are 2 pending tasks possibly being run by other workers
INFO: Worker was stopped. Shutting down Keep-Alive thread

Output after fulfilling the requirement:

$ touch random-ready.txt
$ python dynamic.py --local-scheduler SuperDynamicRequirements2 --a 4
DEBUG: Checking if SuperDynamicRequirements2(a=4) is complete
INFO: Scheduled SuperDynamicRequirements2(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Scheduled RandomNumber()
DEBUG: Checking if PathTask(path=./random-ready.txt) is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 50488] Running   RandomNumber()
INFO: [pid 50488] Done      RandomNumber()
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 50488] Running   SuperDynamicRequirements2(a=4)
DEBUG: Checking if SomeTask(number=4) is complete
INFO: Scheduled SomeTask(number=4)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 50488] Running   SomeTask(number=4)
INFO: [pid 50488] Done      SomeTask(number=4)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
4 x 10 = 40
INFO: [pid 50488] Done      SuperDynamicRequirements2(a=4)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
#!/usr/bin/env python
# coding: utf-8
import luigi
import random
__all__ = ['DynamicRequirements', 'SuperDynamicRequirements']
#
# Simple Dynamic. TaskWithDynamicRequirements will yield a couple of Tasks.
#
class SomeTask(luigi.Task):
"""
Just print some number.
"""
number = luigi.IntParameter()
def run(self):
with self.output().open('w') as output:
output.write('%s\n' % self.number)
def output(self):
return luigi.LocalTarget('./number-%s.txt' % self.number)
class DynamicRequirements(luigi.Task):
"""
This task has a variable number of requirements, based on a parameter.
The required task itself depends on another parameter.
"""
a = luigi.IntParameter(default=3)
b = luigi.IntParameter(default=5)
def requires(self):
return [SomeTask(number=self.a) for i in range(self.b)]
def run(self):
total = 0
for target in self.input():
with target.open() as handle:
total += int(handle.read().strip())
print("%s x %s = %s" % (self.a, self.b, total))
def complete(self):
return False
class PathTask(luigi.ExternalTask):
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(self.path)
#
# Example for a more dynamic case. RandomNumber just generates a random number,
# but SuperDynamicRequirements will need to run RandomNumber within its requires
# method to get a parameter.
#
class RandomNumber(luigi.Task):
"""
Generate a random number.
"""
def requires(self):
return PathTask(path='./random-ready.txt')
def run(self):
with self.output().open('w') as output:
output.write('%s\n' % random.randint(1, 10))
def output(self):
return luigi.LocalTarget('./random.txt')
class SuperDynamicRequirements(luigi.Task):
"""
This task need to run another task to find out what it requirements are.
"""
a = luigi.IntParameter(default=3)
def requires(self):
prerequisite = RandomNumber()
luigi.build([prerequisite], local_scheduler=True)
n = int(prerequisite.output().open().read().strip())
return [SomeTask(number=self.a) for i in range(n)]
def run(self):
total = 0
for target in self.input():
with target.open() as handle:
total += int(handle.read().strip())
print("%s x %s = %s" % (self.a, len(self.input()), total))
def complete(self):
return False
class SuperDynamicRequirements2(luigi.Task):
"""
This task need to run another task to find out what it requirements are.
"""
a = luigi.IntParameter(default=3)
def requires(self):
return RandomNumber()
def run(self):
n = int(self.input().open().read().strip())
tasks = [SomeTask(number=self.a) for i in range(n)]
luigi.build(tasks, local_scheduler=True)
total = 0
for target in tasks:
with target.output().open() as handle:
total += int(handle.read().strip())
print("%s x %s = %s" % (self.a, len(tasks), total))
def complete(self):
return False
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment