Skip to content

Instantly share code, notes, and snippets.

@ashwanthkumar
Forked from coltfred/HFileInputFormat.scala
Last active December 14, 2015 18:59
Show Gist options
  • Save ashwanthkumar/5133463 to your computer and use it in GitHub Desktop.
Save ashwanthkumar/5133463 to your computer and use it in GitHub Desktop.
Updating the HFile Reader init for HBase 0.94.2 - Fixing the null CacheConfig
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.io.hfile.{ HFile, HFileScanner, CacheConfig }
import org.apache.hadoop.hbase.io.hfile.HFile.Reader
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.mapreduce.{ JobContext, InputSplit, TaskAttemptContext, RecordReader }
import org.apache.hadoop.mapreduce.lib.input.{ FileInputFormat, FileSplit }
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics
/**
* A MapReduce InputFormat for HBase's HFile.
*/
class HFileInputFormat extends FileInputFormat[ImmutableBytesWritable, KeyValue] {
override def isSplitable(context: JobContext, file: Path): Boolean = false
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[ImmutableBytesWritable, KeyValue] = {
new HFileRecordReader
}
private class HFileRecordReader extends RecordReader[ImmutableBytesWritable, KeyValue] {
private var reader: HFile.Reader = _
private var scanner: HFileScanner = _
private var entryNumber: Int = 0
def close {
if (reader != null) {
reader.close
}
}
def getCurrentKey: ImmutableBytesWritable = new ImmutableBytesWritable(scanner.getKeyValue.getRow)
def getCurrentValue: KeyValue = scanner.getKeyValue
def getProgress: Float = entryNumber / 1.max(reader.getEntries).toFloat
def initialize(split: InputSplit, context: TaskAttemptContext) {
SchemaMetrics.configureGlobally(context.getConfiguration())
val path = split.asInstanceOf[FileSplit].getPath
val fs = org.apache.hadoop.fs.FileSystem.get(context.getConfiguration)
reader = HFile.createReader(fs, path, new CacheConfig(context.getConfiguration))
scanner = reader.getScanner(false, false)
reader.loadFileInfo // This is required or else seekTo throws a NPE
}
def nextKeyValue: Boolean = {
entryNumber += 1
if (!scanner.isSeeked)
// Had to move this here because "nextKeyValue" is called before the first getCurrentKey
// which was causing us to miss the first row of the HFile.
scanner.seekTo
else {
scanner.next
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment