Skip to content

Instantly share code, notes, and snippets.

@danielfrg
Last active October 15, 2018 16:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielfrg/7091940 to your computer and use it in GitHub Desktop.
Save danielfrg/7091940 to your computer and use it in GitHub Desktop.
Luigi pipeline: 1. Read a bunch of TDF files from local storage and created a big json file in HDFS 2. Uses a hadoop MR job to count the number of words (this is actually a field on each json object)
import json
import luigi
import luigi.hdfs
import luigi.hadoop
import pandas as pd
import numpy
import pandas
luigi.hadoop.attach(numpy, pandas)
class InputText(luigi.ExternalTask):
part = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('%i.tdf' % self.part)
class GenerateJSON(luigi.Task):
def requires(self):
return [InputText(part) for part in range(10)]
def output(self):
return luigi.hdfs.HdfsTarget('data.json')
def run(self):
f = self.output().open('w')
for file in self.input():
df = pd.read_table(file.open('r'), nrows=5)
for index, row in df.iterrows():
f.write("%s\n" % row.to_json())
# output data
f.close()
class NumberOfWords(luigi.hadoop.JobTask):
def requires(self):
return GenerateJSON()
def output(self):
return luigi.hdfs.HdfsTarget('number_of_words')
def mapper(self, line):
line = json.loads(line)
wc = line['word_count']
yield 'word', wc if wc is not None else 0
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
luigi.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment