Skip to content

Instantly share code, notes, and snippets.

@miku
Last active October 15, 2019 16:30
Show Gist options
  • Save miku/7326956 to your computer and use it in GitHub Desktop.
Save miku/7326956 to your computer and use it in GitHub Desktop.
Dynamic requirement example.

Example outputs:

$ python dynamic.py DynamicRequirements --a 4 --b 8

DEBUG: Checking if DynamicRequirements(a=4, b=8) is complete
INFO: Scheduled DynamicRequirements(a=4, b=8)
DEBUG: Checking if SomeTask(number=4) is complete
INFO: Scheduled SomeTask(number=4)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 21375] Running   SomeTask(number=4)
INFO: [pid 21375] Done      SomeTask(number=4)
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21375] Running   DynamicRequirements(a=4, b=8)

4 x 8 = 32

INFO: [pid 21375] Done      DynamicRequirements(a=4, b=8)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

And the super dynamic task:

$ python dynamic.py SuperDynamicRequirements --a 4

DEBUG: Checking if SuperDynamicRequirements(a=4) is complete
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
INFO: Scheduled SuperDynamicRequirements(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
DEBUG: Checking if SomeTask(number=4) is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21439] Running   SuperDynamicRequirements(a=4)
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
DEBUG: Checking if RandomNumber() is complete
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread

4 x 10 = 40

INFO: [pid 21439] Done      SuperDynamicRequirements(a=4)
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker was stopped. Shutting down Keep-Alive thread
#!/usr/bin/env python
# coding: utf-8
import luigi
import random
__all__ = ['DynamicRequirements', 'SuperDynamicRequirements']
#
# Simple Dynamic. TaskWithDynamicRequirements will yield a couple of Tasks.
#
class SomeTask(luigi.Task):
"""
Just print some number.
"""
number = luigi.IntParameter()
def run(self):
with self.output().open('w') as output:
output.write('%s\n' % self.number)
def output(self):
return luigi.LocalTarget('./number-%s.txt' % self.number)
class DynamicRequirements(luigi.Task):
"""
This task has a variable number of requirements, based on a parameter.
The required task itself depends on another parameter.
"""
a = luigi.IntParameter(default=3)
b = luigi.IntParameter(default=5)
def requires(self):
return [SomeTask(number=self.a) for i in range(self.b)]
def run(self):
total = 0
for target in self.input():
with target.open() as handle:
total += int(handle.read().strip())
print("%s x %s = %s" % (self.a, self.b, total))
def complete(self):
return False
#
# Example for a more dynamic case. RandomNumber just generates a random number,
# but SuperDynamicRequirements will need to run RandomNumber within its requires
# method to get a parameter.
#
class RandomNumber(luigi.Task):
"""
Generate a random number.
"""
def run(self):
with self.output().open('w') as output:
output.write('%s\n' % random.randint(1, 10))
def output(self):
return luigi.LocalTarget('./random.txt')
class SuperDynamicRequirements(luigi.Task):
"""
This task need to run another task to find out what it requirements are.
"""
a = luigi.IntParameter(default=3)
def requires(self):
prerequisite = RandomNumber()
luigi.build([prerequisite], local_scheduler=True)
n = int(prerequisite.output().open().read().strip())
return [SomeTask(number=self.a) for i in range(n)]
def run(self):
total = 0
for target in self.input():
with target.open() as handle:
total += int(handle.read().strip())
print("%s x %s = %s" % (self.a, len(self.input()), total))
def complete(self):
return False
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment