Skip to content

Instantly share code, notes, and snippets.

@melekes
Last active August 29, 2015 14:13
Show Gist options
  • Save melekes/8ba19e1baca077d67c65 to your computer and use it in GitHub Desktop.
Save melekes/8ba19e1baca077d67c65 to your computer and use it in GitHub Desktop.
Writing to HBase From Hadoop Mapper
public class AvgTemperatureDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "Avg temperature");
job.setJarByClass(getClass());
job.setNumReduceTasks(0);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
Bytes.toBytes("weather"),
scan,
AvgTemperatureMapper.class,
ImmutableBytesWritable.class,
Put.class,
job
);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "weather");
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvgTemperatureDriver(), args);
System.exit(exitCode);
}
}
public class AvgTemperatureMapper extends TableMapper<ImmutableBytesWritable, Put> {
public void map(ImmutableBytesWritable rowkey, Result result, Context context) throws InterruptedException, IOException {
Integer max = getInt(result, "temperatures", "max");
Integer min = getInt(result, "temperatures", "min");
Put put = new Put(rowkey.copyBytes());
put.add(Bytes.toBytes("temperatures"), Bytes.toBytes("avg"), Bytes.toBytes((max + min) / 2.0));
context.write(new ImmutableBytesWritable(rowkey), put);
}
private Integer getInt(Result result, String family, String qualifier) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes(family), Bytes.toBytes(qualifier));
byte[] value = CellUtil.cloneValue(cell);
return ByteBuffer.wrap(value).getInt();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment