Skip to content

Instantly share code, notes, and snippets.

@ashwanthkumar
Created March 11, 2013 11:55
Show Gist options
  • Save ashwanthkumar/5133733 to your computer and use it in GitHub Desktop.
Save ashwanthkumar/5133733 to your computer and use it in GitHub Desktop.
A MapReduce InputFormat for HBase's HFile. - Tested on HBase 0.94.2 and Hadoop 1.0.1
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* A MapReduce InputFormat for HBase's HFile.
*/
class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, KeyValue> {
@Override
public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new HFileRecordReader();
}
private class HFileRecordReader extends RecordReader<ImmutableBytesWritable, KeyValue> {
HFile.Reader reader;
HFileScanner scanner;
Integer entryNumber = 0;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
SchemaMetrics.configureGlobally(context.getConfiguration());
Path path = ((FileSplit) split).getPath();
FileSystem fs = org.apache.hadoop.fs.FileSystem.get(context.getConfiguration());
reader = HFile.createReader(fs, path, new CacheConfig(context.getConfiguration()));
scanner = reader.getScanner(false, false);
reader.loadFileInfo();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
entryNumber += 1;
if (!scanner.isSeeked())
// Had to move this here because "nextKeyValue" is called before the first getCurrentKey
// which was causing us to miss the first row of the HFile.
return scanner.seekTo();
else {
return scanner.next();
}
}
@Override
public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
}
@Override
public KeyValue getCurrentValue() throws IOException, InterruptedException {
return scanner.getKeyValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (entryNumber / (float) reader.getEntries());
}
@Override
public void close() throws IOException {
if (reader != null) {
reader.close();
}
}
}
}
@FlowerBirds
Copy link

add default constructor:

/**
 * A MapReduce InputFormat for HBase's HFile.
 */
public class HFileInputFormat extends FileInputFormat<ImmutableBytesWritable, Cell> {

    public HFileInputFormat() {

    }
...

@FlowerBirds
Copy link

test with hbase client 1.2.6 and server 1.2.0-cdh5.16.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment