Skip to content

Instantly share code, notes, and snippets.

@LorePep
Last active November 26, 2018 07:47
Show Gist options
  • Save LorePep/c6abd414cdf48e21c4ef96d23551a006 to your computer and use it in GitHub Desktop.
Save LorePep/c6abd414cdf48e21c4ef96d23551a006 to your computer and use it in GitHub Desktop.
A simple multiworkers pipeline in Luigi
import luigi
NUMBER_OF_FILES = 50
class WritePipelineTask(luigi.Task):
flow_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget("data/output_one_{}.txt".format(self.flow_id))
def run(self):
with self.output().open("w") as output_file:
output_file.write("pipeline")
class AddMyTask(luigi.Task):
flow_id = luigi.Parameter()
def output(self):
return luigi.LocalTarget("data/output_two_{}.txt".format(self.flow_id))
def requires(self):
return WritePipelineTask(flow_id=self.flow_id)
def run(self):
with self.input().open("r") as input_file:
line = input_file.read()
with self.output().open("w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
class AllTasks(luigi.WrapperTask):
def requires(self):
for i in range(NUMBER_OF_FILES):
yield AddMyTask(flow_id=str(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment