Skip to content

Instantly share code, notes, and snippets.

@dehowell
Created April 5, 2010 19:25
Show Gist options
  • Save dehowell/356750 to your computer and use it in GitHub Desktop.
Save dehowell/356750 to your computer and use it in GitHub Desktop.
public class RepartitionMapper extends MapReduceBase
implements Mapper<LongWritable, Text, ApacheKeyWritable, Text> {
private static final Log LOG = LogFactory.getLog(RepartitionMapper.class);
private static ApacheKeyWritable outputKey = new ApacheKeyWritable();
public MatchResult parseInputFilename(String filename)
throws IllegalArgumentException {
Pattern p = Pattern.compile(
"/([^/]+)/\\d{4}/access\\.(\\d{8})\\.\\[([^\\]]+)\\]\\.gz$",
Pattern.COMMENTS);
Matcher m = p.matcher(filename);
if (m.find()) {
return m.toMatchResult();
} else {
throw new IllegalArgumentException("Invalid filename: "+filename);
}
}
public void configure(JobConf job) {
String inputFile = job.get("map.input.file");
LOG.info("Working on: "+inputFile);
try {
MatchResult mr = parseInputFilename(inputFile);
String virtualHost = mr.group(1);
String date = mr.group(2);
String machine = mr.group(3);
outputKey.set(new Text(virtualHost),
new Text(date),
new Text(machine),
new LongWritable(0));
} catch (IllegalArgumentException e) {
outputKey.set(new Text("UNKNOWN"),
new Text("UNKNOWN"),
new Text("UNKNOWN"),
new LongWritable(0));
LOG.warn("Unrecognized input file name format: "+inputFile);
}
}
public void map(LongWritable key, Text value,
OutputCollector<ApacheKeyWritable, Text> output, Reporter reporter)
throws IOException {
outputKey.setOffset(key);
output.collect(outputKey, value);
}
}
public class RepartitionReducer extends MapReduceBase
implements Reducer<ApacheKeyWritable, Text, Text, Text> {
private static final Log LOG = LogFactory.getLog(RepartitionMapper.class);
private MultipleOutputs multipleOutputs;
private Hex hex;
private static final Text outputKey = new Text();
@Override
public void configure(JobConf conf) {
multipleOutputs = new MultipleOutputs(conf);
hex = new Hex();
}
public void reduce(ApacheKeyWritable key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
outputKey.set(key.getMachine() + ":" + key.getOffset());
/* Hex encode the virtual host for use in the output file name, because
* MultipleOutputs only allows characters in [a-zA-Z0-9]
*/
String hexEncodedVirtualHost = new String(
hex.encode( key.getVirtualHost().toString().getBytes() )
);
OutputCollector collector = multipleOutputs.getCollector("vh",
hexEncodedVirtualHost + "" + key.getDateString(),
reporter
);
while (values.hasNext()) {
collector.collect(outputKey, values.next());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment