Skip to content

Instantly share code, notes, and snippets.

@dvasilen
Forked from granturing/HCatInputFormat.java
Last active August 29, 2015 14:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvasilen/e13b187c630b1d487e63 to your computer and use it in GitHub Desktop.
Save dvasilen/e13b187c630b1d487e63 to your computer and use it in GitHub Desktop.
public class HCatInputFormat extends InputFormat<SerializableWritable<Writable>, HCatRecord> {
private final org.apache.hcatalog.mapreduce.HCatInputFormat input;
public HCatInputFormat() {
input = new org.apache.hcatalog.mapreduce.HCatInputFormat();
}
@Override
public RecordReader<SerializableWritable<Writable>, HCatRecord> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
return new HCatRecordReader(input.createRecordReader(arg0, arg1));
}
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException,
InterruptedException {
return input.getSplits(arg0);
}
}
class HCatRecordReader extends RecordReader<SerializableWritable<Writable>, HCatRecord> {
private final RecordReader<WritableComparable, HCatRecord> reader;
public HCatRecordReader(RecordReader<WritableComparable, HCatRecord> reader) {
this.reader = reader;
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public SerializableWritable<org.apache.hadoop.io.Writable> getCurrentKey() throws IOException,
InterruptedException {
return new SerializableWritable<org.apache.hadoop.io.Writable>(reader.getCurrentKey());
}
@Override
public HCatRecord getCurrentValue() throws IOException,
InterruptedException {
return reader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
reader.initialize(arg0, arg1);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
}
val f = classOf[org.my.package.spark.hcatalog.HCatInputFormat]
val k = classOf[org.apache.spark.SerializableWritable[org.apache.hadoop.io.Writable]]
val v = classOf[org.apache.hcatalog.data.HCatRecord]
val conf = new org.apache.hadoop.conf.Configuration()
org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(conf, "db", "table")
val data = sc.newAPIHadoopRDD(conf, f, k, v)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment