Created
March 12, 2025 17:35
-
-
Save akashrchandran/36911e11cdb243c4a70356901f66fe1f to your computer and use it in GitHub Desktop.
A simple task showing working of Luigi - a batch processing framework.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import luigi | |
| class ExtractData(luigi.Task): | |
| output_path = luigi.Parameter(default="extracted_data.txt") | |
| def output(self): | |
| return luigi.LocalTarget(self.output_path) | |
| def run(self): | |
| sample_data = ["hello", "world", "luigi", "task", "example"] | |
| with self.output().open("w") as f: | |
| for line in sample_data: | |
| f.write(line + "\n") | |
| print(f"Extracted data to {self.output().path}") | |
| class TransformData(luigi.Task): | |
| input_path = luigi.Parameter(default="extracted_data.txt") | |
| output_path = luigi.Parameter(default="transformed_data.txt") | |
| def requires(self): | |
| return ExtractData(output_path=self.input_path) | |
| def output(self): | |
| return luigi.LocalTarget(self.output_path) | |
| def run(self): | |
| with self.input().open("r") as infile, self.output().open("w") as outfile: | |
| for line in infile: | |
| outfile.write(line.strip().upper() + "\n") | |
| print(f"Transformed data saved to {self.output().path}") | |
| class LoadData(luigi.Task): | |
| input_path = luigi.Parameter(default="transformed_data.txt") | |
| output_path = luigi.Parameter(default="final_data.txt") | |
| def requires(self): | |
| return TransformData(output_path=self.input_path) | |
| def output(self): | |
| return luigi.LocalTarget(self.output_path) | |
| def run(self): | |
| with self.input().open("r") as infile, self.output().open("w") as outfile: | |
| for line in infile: | |
| outfile.write(f"Loaded: {line}") | |
| print(f"Loaded data into {self.output().path}") | |
| if __name__ == "__main__": | |
| luigi.build([LoadData()], local_scheduler=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment