Skip to content

Instantly share code, notes, and snippets.

@LorePep
Last active July 17, 2020 07:48
Show Gist options
  • Save LorePep/b24f73f1da8876ee9e1204678f852ce7 to your computer and use it in GitHub Desktop.
Save LorePep/b24f73f1da8876ee9e1204678f852ce7 to your computer and use it in GitHub Desktop.
A luigi pipeline with multiple files
import luigi
NUMBER_OF_FILES = 50
class WritePipelineTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/_DONE_WRITE_PIPELINE_TASK")
def run(self):
for i in range(NUMBER_OF_FILES):
output_path = "data/output_one_{:d}.txt".format(i)
with open(output_path, "w") as output_file:
output_file.write("pipeline")
open("data/_DONE_WRITE_PIPELINE_TASK", "a").close()
class AddMyTask(luigi.Task):
def output(self):
return luigi.LocalTarget("data/_DONE_ADD_MY_TASK")
def requires(self):
return WritePipelineTask()
def run(self):
for i in range(NUMBER_OF_FILES):
input_path = "data/output_one_{:d}.txt".format(i)
with open(input_path) as input_file:
line = input_file.read()
output_path = "data/output_two_{:d}.txt".format(i)
with open(output_path, "w") as output_file:
decorated_line = "My "+line
output_file.write(decorated_line)
open("data/_DONE_ADD_MY_TASK", "a").close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment