Skip to content

Instantly share code, notes, and snippets.

@leifwickland
Created August 2, 2011 14:34
Show Gist options
  • Save leifwickland/1120311 to your computer and use it in GitHub Desktop.
Save leifwickland/1120311 to your computer and use it in GitHub Desktop.
Allows an HFile to be used as the input to MapReduce.
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.io.hfile.{HFile,HFileScanner}
import org.apache.hadoop.hbase.io.hfile.HFile.Reader
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.mapreduce.{JobContext,InputSplit,TaskAttemptContext,RecordReader}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat,FileSplit}
class HFileInputFormat extends FileInputFormat[ImmutableBytesWritable, KeyValue] {
override def isSplitable(context: JobContext, file: Path): Boolean = false
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[ImmutableBytesWritable, KeyValue] = {
new HFileRecordReader
}
private class HFileRecordReader extends RecordReader[ImmutableBytesWritable, KeyValue] {
private var reader: HFile.Reader = _
private var scanner: HFileScanner = _
private var entryNumber: Int = 0
def close {
if (reader != null) {
reader.close
}
}
def getCurrentKey: ImmutableBytesWritable = new ImmutableBytesWritable(scanner.getKeyValue.getRow)
def getCurrentValue: KeyValue = scanner.getKeyValue
def getProgress: Float = entryNumber / 1.max(reader.getEntries).toFloat
def initialize(split: InputSplit, context: TaskAttemptContext) {
val path = split.asInstanceOf[FileSplit].getPath
val fs = org.apache.hadoop.fs.FileSystem.get(context.getConfiguration)
reader = new HFile.Reader(fs, path, null, false)
scanner = reader.getScanner(false, false)
reader.loadFileInfo // This is required or else seekTo throws a NPE
scanner.seekTo // This is required or else scanner.next throws an error
}
def nextKeyValue: Boolean = {
entryNumber += 1
scanner.next
}
}
}
@coltfred
Copy link

This will skip the first entry of an HFile. I've forked this gist and fixed the defect.

@stepinto
Copy link

entryNumber will overflow when HFile is exterme large (because it is not splittalbe).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment