Skip to content

Instantly share code, notes, and snippets.

@benlee
Created January 25, 2013 03:08
Show Gist options
  • Save benlee/4631407 to your computer and use it in GitHub Desktop.
Save benlee/4631407 to your computer and use it in GitHub Desktop.
ThriftSequenceFileOutputFormat
import com.foursquare.base2.MessageLogging
import com.foursquare.hfile.common.HFileUtil.BoundedTBase
import org.apache.hadoop.hbase.io.hfile.{Compression, HFile}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.{BytesWritable, NullWritable}
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat}
class ThriftSequenceFileOutputFormat[T <: BoundedTBase] extends FileOutputFormat[NullWritable, T] {
private[io] val sequenceOutput = new SequenceFileOutputFormat[NullWritable, BytesWritable]()
override def getRecordWriter(context: TaskAttemptContext): ThriftSequenceFileRecordWriter[T] = {
new ThriftSequenceFileRecordWriter[T](sequenceOutput.getRecordWriter(context), context)
}
}
class ThriftSequenceFileRecordWriter[V <: BoundedTBase](
writer: RecordWriter[NullWritable, BytesWritable],
context: TaskAttemptContext) extends RecordWriter[NullWritable, V] {
val valueSerializer = new ThriftSerializerWithClassName[V]()
override def write(key: NullWritable, value: V) {
val bw = new BytesWritable(valueSerializer.serialize(value))
writer.write(key, bw)
}
override def close(context: TaskAttemptContext): Unit = {
writer.close(context)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment