Skip to content

Instantly share code, notes, and snippets.

@kishoreg
Last active January 9, 2018 01:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kishoreg/6db55986776e7d3b2d31a96f887f553c to your computer and use it in GitHub Desktop.
Save kishoreg/6db55986776e7d3b2d31a96f887f553c to your computer and use it in GitHub Desktop.
/**
* Copyright (C) 2014-2016 LinkedIn Corp. (pinot-core@linkedin.com)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.pinot.core.data.readers;
import com.linkedin.pinot.common.data.FieldSpec;
import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.core.data.GenericRow;
import org.apache.thrift.TBase;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
public class ThriftRecordReader implements RecordReader {
private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRecordReader.class);
private final Schema _schema;
private final ThriftRecordReaderConfig _recordReaderConfig;
private final Class<TBase<?, ?>> _thriftClass;
/**
* For reading the binary thrift objects.
*/
private TBinaryProtocol _binaryIn;
private BufferedInputStream _bufferIn;
private final File _dataFile;
private ThriftField _thriftField;
Map<String, Integer> _fieldNameToIndexMap;
public ThriftRecordReader(File dataFile, Schema schema, ThriftRecordReaderConfig recordReaderConfig) throws IOException, ClassNotFoundException {
this._schema = schema;
this._dataFile = dataFile;
this._recordReaderConfig = recordReaderConfig;
this._thriftClass = thriftClassCreator();
this._bufferIn = RecordReaderUtils.getFileBufferStream(dataFile);
this._binaryIn = new TBinaryProtocol(new TIOStreamTransport(_bufferIn));
this._thriftField = new ThriftField();
this._fieldNameToIndexMap = new HashMap();
TBase t = this._thriftClass.newInstance();
int index = 0;
TFieldIdEnum fieldIdEnum = null;
do {
TFieldIdEnum fieldIdEnum = t.getFieldForId(index);
if(fieldIdEnum != null ){
_fieldNameToIndexMap.put(fieldIdEnum.getFieldName(), index);
}
index =index + 1;
} while(fieldIdEnum!=null);
}
@Override
public boolean hasNext() {
_bufferIn.mark(1);
int val = 0;
try {
val = _bufferIn.read();
_bufferIn.reset();
} catch (IOException e) {
LOGGER.error("Error in iterating Reader", e);
val = -1;
}
return val != -1;
}
@Override
public GenericRow next() throws IOException {
return next(new GenericRow());
}
@Override
public GenericRow next(GenericRow reuse) throws IOException {
try {
TBase t = this._thriftClass.newInstance();
t.read(_binaryIn);
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
String fieldName = fieldSpec.getName();
if(_fieldNameToIndexMap.containsKey(fieldName)){
int tFieldId = _fieldNameToIndexMap.get(fieldName);
TFieldIdEnum tFieldIdEnum = t.fieldForId(tFieldId);
if(t.isSet(tFieldIdEnum)) {
Object value = t.getFieldValue(tFieldIdEnum);
reuse.putField(fieldName, value);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return reuse;
}
@Override
public void rewind() throws IOException {
_bufferIn = RecordReaderUtils.getFileBufferStream(_dataFile);
_binaryIn = new TBinaryProtocol(new TIOStreamTransport(_bufferIn));
}
@Override
public Schema getSchema() {
return _schema;
}
@Override
public void close() throws IOException {
_bufferIn.close();
}
private Class<TBase<?, ?>> thriftClassCreator() throws ClassNotFoundException {
Class<TBase<?, ?>> tBase = null;
if (_recordReaderConfig == null || _recordReaderConfig.get_thriftClass() == null) {
throw new IllegalArgumentException("");
}
try {
tBase = (Class<TBase<?, ?>>) Class.forName(_recordReaderConfig.get_thriftClass());
} catch (Exception e) {
throw new ClassNotFoundException("Error in initializing TBase class", e);
}
return tBase;
}
private static class ThriftField implements TFieldIdEnum {
private short fieldId;
private String fieldName;
ThriftField() {
}
ThriftField(short fieldId, String fieldName) {
this.fieldId = fieldId;
this.fieldName = fieldName;
}
@Override
public short getThriftFieldId() {
return fieldId;
}
@Override
public String getFieldName() {
return fieldName;
}
public ThriftField setFieldId(short fieldId) {
this.fieldId = fieldId;
return this;
}
public ThriftField setFieldName(String fieldName) {
this.fieldName = fieldName;
return this;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment