Skip to content

Instantly share code, notes, and snippets.

@meyarivan
Created September 23, 2014 16:38
Show Gist options
  • Save meyarivan/d690da931f1d398b1eff to your computer and use it in GitHub Desktop.
Save meyarivan/d690da931f1d398b1eff to your computer and use it in GitHub Desktop.
Simple streaming job to process raw weblogs
#!/usr/bin/env python
import sys, os
import codecs
import datetime
import mrjob.job
import mrjob.protocol
import simplejson as json
import math
class WeblogJob(mrjob.job.MRJob):
OUTPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.SequenceFileAsTextInputFormat'
def mapper(self, _, line):
try:
fields = line.split('\t')
yield fields[7], 1
except Exception as e:
self.increment_counter("errors", "all")
def combiner(self, key, vals):
yield key, sum(vals)
def reducer(self, key, vals):
yield '', '%s\t%d' % (key, sum(vals))
if __name__ == '__main__':
WeblogJob.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment