Skip to content

Instantly share code, notes, and snippets.

@kishoreg
Forked from ananthdurai/PinotRecord.java
Created January 31, 2018 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kishoreg/4426e84de6d5059340912f1d75fbec35 to your computer and use it in GitHub Desktop.
Save kishoreg/4426e84de6d5059340912f1d75fbec35 to your computer and use it in GitHub Desktop.
/*
This is the base Pinot Record
*/
package com.linkedin.pinot.hadoop.io;
import com.linkedin.pinot.core.util.AvroUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
public class PinotRecord implements GenericRecord {
private GenericData.Record _record;
private Schema _avroSchema;
private com.linkedin.pinot.common.data.Schema _pinotSchema;
public PinotRecord(com.linkedin.pinot.common.data.Schema pinotSchema) {
_pinotSchema = pinotSchema;
_avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema);
_record = new GenericData.Record(_avroSchema);
}
@Override
public void put(String key, Object v) {
_record.put(key, v);
}
@Override
public Object get(String key) {
return _record.get(key);
}
@Override
public void put(int i, Object v) {
_record.put(i, v);
}
@Override
public Object get(int i) {
return _record.get(i);
}
@Override
public Schema getSchema() {
return _avroSchema;
}
public GenericData.Record get() {
return _record;
}
}
/*
This is the serialization contract
*/
package com.linkedin.pinot.hadoop.io;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
public interface PinotRecordSerialization<T> {
void init(Configuration conf, org.apache.avro.Schema schema);
PinotRecord serialize(T t) throws IOException;
T deserialize(PinotRecord record) throws IOException;
void close();
}
/*
Sample JsonPinot Serialization
*/
public static class JsonPinotRecordSerialization<T> implements PinotRecordSerialization<T> {
private ObjectMapper _mapper;
private Schema _schema;
private DatumReader<PinotRecord> _reader;
@Override
public void init(Configuration conf, Schema schema) {
_schema = schema;
_reader = new GenericDatumReader<>(schema);
_mapper = new ObjectMapper();
}
@Override
public PinotRecord serialize(T t) throws IOException {
Decoder decoder = DecoderFactory.get().jsonDecoder(_schema, _mapper.writeValueAsString(t));
return _reader.read(null, decoder);
}
@Override
public T deserialize(PinotRecord record) throws IOException {
return null;
}
@Override
public void close() {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment