Skip to content

Instantly share code, notes, and snippets.

@samuell
Created December 3, 2013 18:51
Show Gist options
  • Save samuell/7775221 to your computer and use it in GitHub Desktop.
Save samuell/7775221 to your computer and use it in GitHub Desktop.
Demonstrates how one can select which parent task a luigi task should depend on, by specifying the parent task class name via a parameter.
import luigi
class ATask(luigi.Task):
def output(self):
return luigi.LocalTarget("atask_output.txt")
def run(self):
with self.output().open("w") as outfile:
outfile.write("Test")
print "ATask was run"
class BTask(luigi.Task):
def requires(self):
return ATask()
def output(self):
return luigi.LocalTarget("btask_output.txt")
def run(self):
with self.output().open("w") as outfile:
outfile.write("Test")
print "BTask was run"
class CTask(luigi.Task):
parent_class = luigi.Parameter()
def get_class(self, class_name):
return globals()[class_name]
def requires(self):
return self.get_class(self.parent_class)()
def output(self):
return luigi.LocalTarget("ctask_output.txt")
def run(self):
with self.output().open("w") as outfile:
outfile.write("Test")
print "CTask was run"
if __name__ == '__main__':
luigi.run()
@samuell
Copy link
Author

samuell commented Dec 3, 2013

Now, CTask can either depend on ATask or BTask, by choosing one of the following commands:

python test.py --local-scheduler CTask --parent-class ATask

and:

python test.py --local-scheduler CTask --parent-class BTask

@acdameli
Copy link

acdameli commented Mar 2, 2020

Randomly came across this gist googling something... Turn out in the last 7 years they've added a TaskParameter (:rofl:). For others who stumble across this an alternative implementation for CTask would be:

class CTask(luigi.Task):
    required_task = luigi.TaskParameter()
    def requires(self):
        # whatever args you need to pass through, keeping in mind:
        # `--BTask-some-param 'BTask Param Value'` can be used
        # for global assignment of BTask instances
        args = []
        return self.required_task(*args)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment