Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parquet columns reader1
package com.company.grid.lookup;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReader;
import parquet.column.impl.ColumnReadStoreImpl;
import parquet.column.page.PageReadStore;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
import parquet.io.api.Converter;
import parquet.io.api.GroupConverter;
import parquet.io.api.PrimitiveConverter;
import parquet.schema.MessageType;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
final class DimColumn {
private volatile Object arrayList;
private String name;
private Class<?> typeOfArray;
private int totalSize;
private ColumnDescriptor columnDescriptor;
DimColumn(ColumnDescriptor columnDescriptor, int totalSize) {
this.columnDescriptor = columnDescriptor;
this.name = columnDescriptor.getPath()[0];
this.typeOfArray = columnDescriptor.getType().javaType;
this.totalSize = totalSize;
this.arrayList = Array.newInstance(typeOfArray, totalSize);
}
public ColumnDescriptor getColumnDescriptor() {
return columnDescriptor;
}
public Object getArrayList() {
return arrayList;
}
public Object getName() {
return name;
}
public Object getTotalSize() {
return totalSize;
}
}
public class RfiParquetFileReader {
ParquetMetadata metaData;
MessageType schema;
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();
public RfiParquetFileReader(String fileName) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("fs.file.impl", LocalFileSystem.class.getName());
Path filePath = new Path(fileName);
metaData = ParquetFileReader.readFooter(conf, filePath);
schema = metaData.getFileMetaData().getSchema();
List<BlockMetaData> blocks;
blocks = metaData.getBlocks();
int totalSize = (int) blocks.get(0).getRowCount();
List<ColumnDescriptor> columnDescriptors = schema.getColumns();
List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize);
ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors);
PageReadStore pageReadStore = fileReader.readNextRowGroup();
int index = 0;
while (pageReadStore != null) {
ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
index = load(dimColumns, columnReadStoreImpl, index);
pageReadStore = fileReader.readNextRowGroup();
}
Random rand = new Random();
for (DimColumn dimColumn : dimColumns) {
System.out.println(dimColumn.getName());
for(int i = 0; i < 5; i++) {
index = rand.nextInt((int) totalSize);
System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index));
}
System.out.println("--------");
}
}
public String getSchema() {
return schema.toString();
}
public static void main(String[] args) {
String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/";
// String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"};
String[] files = {"13.assigned_conversion.parquet"};
try {
long startTime = System.currentTimeMillis();
for (String file : files) {
new RfiParquetFileReader(dirName + file);
System.out.println("========================================================================");
}
long endTime = System.currentTimeMillis();
System.out.println("Time taken: " + (endTime - startTime) + "ms");
} catch (IOException e) {
e.printStackTrace();
}
}
public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) {
ArrayList dimColumns = new ArrayList<DimColumn>();
for (ColumnDescriptor columnDescriptor : columnDescriptors) {
dimColumns.add(new DimColumn(columnDescriptor, totalSize));
}
return dimColumns;
}
public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException {
int index = 1;
for (DimColumn dc : dimColumns) {
index = startIndex;
int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
// System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
int definitionLevel = columnReader.getCurrentDefinitionLevel();
if (definitionLevel == maxDefinitionLevel) {
switch (dc.getColumnDescriptor().getType()) {
case BINARY:
String str = new String(columnReader.getBinary().getBytes(), "UTF-8");
System.out.println(index + " : " + dc.getName() + " : " + str);
Array.set(dc.getArrayList(), index, columnReader.getBinary()); break;
case BOOLEAN:
Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break;
case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break;
case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break;
case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break;
case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break;
case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break;
// case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
}
}
columnReader.consume();
index += 1;
}
}
return startIndex + index;
}
// public static String binaryToString(Binary value) {
// byte[] data = value.getBytes();
// if (data == null) return null;
//
// try {
// CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
// return buffer.toString();
// } catch (Throwable th) {
// }
//
// return "";
// }
public static BigInteger binaryToBigInteger(Binary value) {
byte[] data = value.getBytes();
if (data == null) return null;
return new BigInteger(data);
}
private static final class DumpGroupConverter extends GroupConverter {
@Override public void start() { }
@Override public void end() { }
@Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
}
private static final class DumpConverter extends PrimitiveConverter {
@Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
}
}
@tispratik

This comment has been minimized.

Copy link
Owner Author

@tispratik tispratik commented Aug 28, 2014

This version of the reader throws an exception when reading some data of some columns.

Exception in thread "main" parquet.io.ParquetDecodingException: Can't read value in column [description] BINARY at value 44899 out of 57096, 44899 out of 57096 in currentPage. repetition level: 0, definition level: 1
    at parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:450)
    at parquet.column.impl.ColumnReaderImpl.getBinary(ColumnReaderImpl.java:398)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.load(RfiParquetFileReader.java:147)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.<init>(RfiParquetFileReader.java:87)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.main(RfiParquetFileReader.java:114)
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
    at parquet.Preconditions.checkArgument(Preconditions.java:47)
    at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
    at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
    at parquet.column.values.dictionary.DictionaryValuesReader.readBytes(DictionaryValuesReader.java:82)
    at parquet.column.impl.ColumnReaderImpl$2$6.read(ColumnReaderImpl.java:295)
    at parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:446)
    ... 4 more
@tispratik

This comment has been minimized.

Copy link
Owner Author

@tispratik tispratik commented Aug 29, 2014

A similar issue was reported here https://issues.apache.org/jira/browse/DRILL-827

@dvryaboy

This comment has been minimized.

Copy link

@dvryaboy dvryaboy commented Aug 29, 2014

how were the parquet files created? Any chance you can share them?

@tispratik

This comment has been minimized.

Copy link
Owner Author

@tispratik tispratik commented Aug 29, 2014

Found the problem, i was getting the current definitionLevel outside of the for loop for (long i = 0, totalValueCount.... I need to get the current definition level every time i consume().

definitionLevel = columnReader.getCurrentDefinitionLevel();

@tispratik

This comment has been minimized.

Copy link
Owner Author

@tispratik tispratik commented Aug 29, 2014

Updated the gist, now it should work. Btw, the parquet files were created through sqoop import of mysql.

@tispratik

This comment has been minimized.

Copy link
Owner Author

@tispratik tispratik commented Aug 30, 2014

This reader does not re-open the parquet file for every column read.

@ghost

This comment has been minimized.

Copy link

@ghost ghost commented Nov 17, 2014

Hi Pratik,

i am using CDH5,
Could you please tell me the step by step procedure to insert the csv file into hive table as a parquet file formate.

thank you,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.