Skip to content

Instantly share code, notes, and snippets.

@granturing
Last active September 28, 2016 14:24
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save granturing/7201912 to your computer and use it in GitHub Desktop.
Save granturing/7201912 to your computer and use it in GitHub Desktop.
HCatalog InputFormat wrapper to use with Spark (FYI for those finding this now, this was originally written pre-SparkSQL)
public class HCatInputFormat extends InputFormat<SerializableWritable<Writable>, HCatRecord> {
private final org.apache.hcatalog.mapreduce.HCatInputFormat input;
public HCatInputFormat() {
input = new org.apache.hcatalog.mapreduce.HCatInputFormat();
}
@Override
public RecordReader<SerializableWritable<Writable>, HCatRecord> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
return new HCatRecordReader(input.createRecordReader(arg0, arg1));
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
return input.getSplits(arg0);
}
}
class HCatRecordReader extends RecordReader<SerializableWritable<Writable>, HCatRecord> {
private final RecordReader<WritableComparable, HCatRecord> reader;
public HCatRecordReader(RecordReader<WritableComparable, HCatRecord> reader) {
this.reader = reader;
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public SerializableWritable<org.apache.hadoop.io.Writable> getCurrentKey() throws IOException,
InterruptedException {
return new SerializableWritable<org.apache.hadoop.io.Writable>(reader.getCurrentKey());
}
@Override
public HCatRecord getCurrentValue() throws IOException,
InterruptedException {
return reader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
reader.initialize(arg0, arg1);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
}
val f = classOf[org.my.package.spark.hcatalog.HCatInputFormat]
val k = classOf[org.apache.spark.SerializableWritable[org.apache.hadoop.io.Writable]]
val v = classOf[org.apache.hcatalog.data.HCatRecord]
val conf = new org.apache.hadoop.conf.Configuration()
org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(conf, "db", "table")
val data = sc.newAPIHadoopRDD(conf, f, k, v)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment