Skip to content

Instantly share code, notes, and snippets.

@akashrchandran
Created March 12, 2025 17:35
Show Gist options
  • Select an option

  • Save akashrchandran/36911e11cdb243c4a70356901f66fe1f to your computer and use it in GitHub Desktop.

Select an option

Save akashrchandran/36911e11cdb243c4a70356901f66fe1f to your computer and use it in GitHub Desktop.
A simple task showing working of Luigi - a batch processing framework.
import luigi
class ExtractData(luigi.Task):
output_path = luigi.Parameter(default="extracted_data.txt")
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
sample_data = ["hello", "world", "luigi", "task", "example"]
with self.output().open("w") as f:
for line in sample_data:
f.write(line + "\n")
print(f"Extracted data to {self.output().path}")
class TransformData(luigi.Task):
input_path = luigi.Parameter(default="extracted_data.txt")
output_path = luigi.Parameter(default="transformed_data.txt")
def requires(self):
return ExtractData(output_path=self.input_path)
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
with self.input().open("r") as infile, self.output().open("w") as outfile:
for line in infile:
outfile.write(line.strip().upper() + "\n")
print(f"Transformed data saved to {self.output().path}")
class LoadData(luigi.Task):
input_path = luigi.Parameter(default="transformed_data.txt")
output_path = luigi.Parameter(default="final_data.txt")
def requires(self):
return TransformData(output_path=self.input_path)
def output(self):
return luigi.LocalTarget(self.output_path)
def run(self):
with self.input().open("r") as infile, self.output().open("w") as outfile:
for line in infile:
outfile.write(f"Loaded: {line}")
print(f"Loaded data into {self.output().path}")
if __name__ == "__main__":
luigi.build([LoadData()], local_scheduler=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment