Skip to content

Instantly share code, notes, and snippets.

@kunanit
Created March 7, 2019 23:02
Show Gist options
  • Save kunanit/9d93797ba92f7ecc61ced829ebb5170b to your computer and use it in GitHub Desktop.
Save kunanit/9d93797ba92f7ecc61ced829ebb5170b to your computer and use it in GitHub Desktop.
import datetime
import luigi
import hashlib
from collections import defaultdict
import json
from os.path import join
# luigi --module tasks ReportStep --local-scheduler
# clean up: rm data/output_*
DATA_DIR = '/app/data'
def get_hash(file_contents=None, salt=''):
hasher = hashlib.md5(salt.encode())
if file_contents:
with open(file_contents, 'rb') as input_file:
buf = input_file.read()
hasher.update(buf)
return hasher.hexdigest()[:8]
class WordsInput(luigi.ExternalTask):
def output(self):
filename = 'input_words.txt'
return luigi.LocalTarget(join(DATA_DIR, filename))
class CountStep(luigi.Task):
def requires(self):
return WordsInput()
def output(self):
# filename = 'output_count.txt'
hex_tag = get_hash(salt=self.input().path)
filename = f'output_count_{hex_tag}.txt'
return luigi.LocalTarget(join(DATA_DIR, filename))
def run(self):
with self.input().open() as fi:
words = fi.read().splitlines()
word_counts = defaultdict(int)
for word in words:
word_counts[word] += 1
with self.output().open('w') as out_file:
json.dump(word_counts, out_file)
class ValidWordsInput(luigi.ExternalTask):
def output(self):
filename = 'input_valid_words.txt'
return luigi.LocalTarget(join(DATA_DIR, filename))
class FilterStep(luigi.Task):
def requires(self):
return {
'counts': CountStep(),
'valid_words': ValidWordsInput()
}
def output(self):
filename = 'output_filter.txt'
hex_tag = get_hash(
file_contents=self.input()['valid_words'].path,
salt=self.input()['counts'].path
)
filename = f'output_filter_{hex_tag}.txt'
return luigi.LocalTarget(join(DATA_DIR, filename))
def run(self):
with self.input()['counts'].open() as fi:
word_counts = json.load(fi)
with self.input()['valid_words'].open() as fi:
valid_words = fi.read().splitlines()
filtered_word_counts = {word: count for word, count in word_counts.items() if word in valid_words}
with self.output().open('w') as fo:
json.dump(filtered_word_counts, fo)
class ReportStep(luigi.Task):
def requires(self):
return FilterStep()
def output(self):
# filename = "output_report.txt"
hex_tag = get_hash(salt=self.input().path)
filename = f'output_report_{hex_tag}.txt'
return luigi.LocalTarget(join(DATA_DIR, filename))
def run(self):
with self.output().open('w') as fo:
date_string = str(datetime.datetime.now())
fo.write(date_string + "\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment