Last active
November 26, 2018 07:47
-
-
Save LorePep/c6abd414cdf48e21c4ef96d23551a006 to your computer and use it in GitHub Desktop.
A simple multiworkers pipeline in Luigi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
NUMBER_OF_FILES = 50 | |
class WritePipelineTask(luigi.Task): | |
flow_id = luigi.Parameter() | |
def output(self): | |
return luigi.LocalTarget("data/output_one_{}.txt".format(self.flow_id)) | |
def run(self): | |
with self.output().open("w") as output_file: | |
output_file.write("pipeline") | |
class AddMyTask(luigi.Task): | |
flow_id = luigi.Parameter() | |
def output(self): | |
return luigi.LocalTarget("data/output_two_{}.txt".format(self.flow_id)) | |
def requires(self): | |
return WritePipelineTask(flow_id=self.flow_id) | |
def run(self): | |
with self.input().open("r") as input_file: | |
line = input_file.read() | |
with self.output().open("w") as output_file: | |
decorated_line = "My "+line | |
output_file.write(decorated_line) | |
class AllTasks(luigi.WrapperTask): | |
def requires(self): | |
for i in range(NUMBER_OF_FILES): | |
yield AddMyTask(flow_id=str(i)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment