Last active
January 9, 2018 01:53
-
-
Save kishoreg/6db55986776e7d3b2d31a96f887f553c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2014-2016 LinkedIn Corp. (pinot-core@linkedin.com) | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except | |
* in compliance with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software distributed under the License | |
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express | |
* or implied. See the License for the specific language governing permissions and limitations under | |
* the License. | |
*/ | |
package com.linkedin.pinot.core.data.readers; | |
import com.linkedin.pinot.common.data.FieldSpec; | |
import com.linkedin.pinot.common.data.Schema; | |
import com.linkedin.pinot.core.data.GenericRow; | |
import org.apache.thrift.TBase; | |
import org.apache.thrift.TFieldIdEnum; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import org.apache.thrift.transport.TIOStreamTransport; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.BufferedInputStream; | |
import java.io.File; | |
import java.io.IOException; | |
public class ThriftRecordReader implements RecordReader { | |
private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRecordReader.class); | |
private final Schema _schema; | |
private final ThriftRecordReaderConfig _recordReaderConfig; | |
private final Class<TBase<?, ?>> _thriftClass; | |
/** | |
* For reading the binary thrift objects. | |
*/ | |
private TBinaryProtocol _binaryIn; | |
private BufferedInputStream _bufferIn; | |
private final File _dataFile; | |
private ThriftField _thriftField; | |
Map<String, Integer> _fieldNameToIndexMap; | |
public ThriftRecordReader(File dataFile, Schema schema, ThriftRecordReaderConfig recordReaderConfig) throws IOException, ClassNotFoundException { | |
this._schema = schema; | |
this._dataFile = dataFile; | |
this._recordReaderConfig = recordReaderConfig; | |
this._thriftClass = thriftClassCreator(); | |
this._bufferIn = RecordReaderUtils.getFileBufferStream(dataFile); | |
this._binaryIn = new TBinaryProtocol(new TIOStreamTransport(_bufferIn)); | |
this._thriftField = new ThriftField(); | |
this._fieldNameToIndexMap = new HashMap(); | |
TBase t = this._thriftClass.newInstance(); | |
int index = 0; | |
TFieldIdEnum fieldIdEnum = null; | |
do { | |
TFieldIdEnum fieldIdEnum = t.getFieldForId(index); | |
if(fieldIdEnum != null ){ | |
_fieldNameToIndexMap.put(fieldIdEnum.getFieldName(), index); | |
} | |
index =index + 1; | |
} while(fieldIdEnum!=null); | |
} | |
@Override | |
public boolean hasNext() { | |
_bufferIn.mark(1); | |
int val = 0; | |
try { | |
val = _bufferIn.read(); | |
_bufferIn.reset(); | |
} catch (IOException e) { | |
LOGGER.error("Error in iterating Reader", e); | |
val = -1; | |
} | |
return val != -1; | |
} | |
@Override | |
public GenericRow next() throws IOException { | |
return next(new GenericRow()); | |
} | |
@Override | |
public GenericRow next(GenericRow reuse) throws IOException { | |
try { | |
TBase t = this._thriftClass.newInstance(); | |
t.read(_binaryIn); | |
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) { | |
String fieldName = fieldSpec.getName(); | |
if(_fieldNameToIndexMap.containsKey(fieldName)){ | |
int tFieldId = _fieldNameToIndexMap.get(fieldName); | |
TFieldIdEnum tFieldIdEnum = t.fieldForId(tFieldId); | |
if(t.isSet(tFieldIdEnum)) { | |
Object value = t.getFieldValue(tFieldIdEnum); | |
reuse.putField(fieldName, value); | |
} | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
return reuse; | |
} | |
@Override | |
public void rewind() throws IOException { | |
_bufferIn = RecordReaderUtils.getFileBufferStream(_dataFile); | |
_binaryIn = new TBinaryProtocol(new TIOStreamTransport(_bufferIn)); | |
} | |
@Override | |
public Schema getSchema() { | |
return _schema; | |
} | |
@Override | |
public void close() throws IOException { | |
_bufferIn.close(); | |
} | |
private Class<TBase<?, ?>> thriftClassCreator() throws ClassNotFoundException { | |
Class<TBase<?, ?>> tBase = null; | |
if (_recordReaderConfig == null || _recordReaderConfig.get_thriftClass() == null) { | |
throw new IllegalArgumentException(""); | |
} | |
try { | |
tBase = (Class<TBase<?, ?>>) Class.forName(_recordReaderConfig.get_thriftClass()); | |
} catch (Exception e) { | |
throw new ClassNotFoundException("Error in initializing TBase class", e); | |
} | |
return tBase; | |
} | |
private static class ThriftField implements TFieldIdEnum { | |
private short fieldId; | |
private String fieldName; | |
ThriftField() { | |
} | |
ThriftField(short fieldId, String fieldName) { | |
this.fieldId = fieldId; | |
this.fieldName = fieldName; | |
} | |
@Override | |
public short getThriftFieldId() { | |
return fieldId; | |
} | |
@Override | |
public String getFieldName() { | |
return fieldName; | |
} | |
public ThriftField setFieldId(short fieldId) { | |
this.fieldId = fieldId; | |
return this; | |
} | |
public ThriftField setFieldName(String fieldName) { | |
this.fieldName = fieldName; | |
return this; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment