Skip to content

Instantly share code, notes, and snippets.

@tispratik
Last active March 15, 2017 13:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save tispratik/f0044dd84dc8d8c6cbcf to your computer and use it in GitHub Desktop.
Save tispratik/f0044dd84dc8d8c6cbcf to your computer and use it in GitHub Desktop.
Parquet columns reader1
package com.company.grid.lookup;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReader;
import parquet.column.impl.ColumnReadStoreImpl;
import parquet.column.page.PageReadStore;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
import parquet.io.api.Converter;
import parquet.io.api.GroupConverter;
import parquet.io.api.PrimitiveConverter;
import parquet.schema.MessageType;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
final class DimColumn {
private volatile Object arrayList;
private String name;
private Class<?> typeOfArray;
private int totalSize;
private ColumnDescriptor columnDescriptor;
DimColumn(ColumnDescriptor columnDescriptor, int totalSize) {
this.columnDescriptor = columnDescriptor;
this.name = columnDescriptor.getPath()[0];
this.typeOfArray = columnDescriptor.getType().javaType;
this.totalSize = totalSize;
this.arrayList = Array.newInstance(typeOfArray, totalSize);
}
public ColumnDescriptor getColumnDescriptor() {
return columnDescriptor;
}
public Object getArrayList() {
return arrayList;
}
public Object getName() {
return name;
}
public Object getTotalSize() {
return totalSize;
}
}
public class RfiParquetFileReader {
ParquetMetadata metaData;
MessageType schema;
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();
public RfiParquetFileReader(String fileName) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("fs.file.impl", LocalFileSystem.class.getName());
Path filePath = new Path(fileName);
metaData = ParquetFileReader.readFooter(conf, filePath);
schema = metaData.getFileMetaData().getSchema();
List<BlockMetaData> blocks;
blocks = metaData.getBlocks();
int totalSize = (int) blocks.get(0).getRowCount();
List<ColumnDescriptor> columnDescriptors = schema.getColumns();
List<DimColumn> dimColumns = makeDimColumns(columnDescriptors, totalSize);
ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, columnDescriptors);
PageReadStore pageReadStore = fileReader.readNextRowGroup();
int index = 0;
while (pageReadStore != null) {
ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
index = load(dimColumns, columnReadStoreImpl, index);
pageReadStore = fileReader.readNextRowGroup();
}
Random rand = new Random();
for (DimColumn dimColumn : dimColumns) {
System.out.println(dimColumn.getName());
for(int i = 0; i < 5; i++) {
index = rand.nextInt((int) totalSize);
System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.getArrayList(), index));
}
System.out.println("--------");
}
}
public String getSchema() {
return schema.toString();
}
public static void main(String[] args) {
String dirName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/";
// String[] files = {"1.campaigns.parquet", "13.assigned_conversion.parquet"};
String[] files = {"13.assigned_conversion.parquet"};
try {
long startTime = System.currentTimeMillis();
for (String file : files) {
new RfiParquetFileReader(dirName + file);
System.out.println("========================================================================");
}
long endTime = System.currentTimeMillis();
System.out.println("Time taken: " + (endTime - startTime) + "ms");
} catch (IOException e) {
e.printStackTrace();
}
}
public ArrayList<DimColumn> makeDimColumns(List<ColumnDescriptor> columnDescriptors, int totalSize) {
ArrayList dimColumns = new ArrayList<DimColumn>();
for (ColumnDescriptor columnDescriptor : columnDescriptors) {
dimColumns.add(new DimColumn(columnDescriptor, totalSize));
}
return dimColumns;
}
public int load(List<DimColumn> dimColumns, ColumnReadStoreImpl columnReadStore, int startIndex) throws IOException {
int index = 1;
for (DimColumn dc : dimColumns) {
index = startIndex;
int maxDefinitionLevel = dc.getColumnDescriptor().getMaxDefinitionLevel();
ColumnReader columnReader = columnReadStore.getColumnReader(dc.getColumnDescriptor());
// System.out.println(dc.getTotalSize() + " : " + columnReader.getTotalValueCount() + " - " + dc.getName());
for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
int definitionLevel = columnReader.getCurrentDefinitionLevel();
if (definitionLevel == maxDefinitionLevel) {
switch (dc.getColumnDescriptor().getType()) {
case BINARY:
String str = new String(columnReader.getBinary().getBytes(), "UTF-8");
System.out.println(index + " : " + dc.getName() + " : " + str);
Array.set(dc.getArrayList(), index, columnReader.getBinary()); break;
case BOOLEAN:
Array.set(dc.getArrayList(), index, columnReader.getBoolean()); break;
case DOUBLE: Array.set(dc.getArrayList(), index, columnReader.getDouble()); break;
case FLOAT: Array.set(dc.getArrayList(), index, columnReader.getFloat()); break;
case INT32: Array.set(dc.getArrayList(), index, columnReader.getInteger()); break;
case INT64: Array.set(dc.getArrayList(), index, columnReader.getLong()); break;
case INT96: Array.set(dc.getArrayList(), index, binaryToBigInteger(columnReader.getBinary())); break;
// case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
}
}
columnReader.consume();
index += 1;
}
}
return startIndex + index;
}
// public static String binaryToString(Binary value) {
// byte[] data = value.getBytes();
// if (data == null) return null;
//
// try {
// CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
// return buffer.toString();
// } catch (Throwable th) {
// }
//
// return "";
// }
public static BigInteger binaryToBigInteger(Binary value) {
byte[] data = value.getBytes();
if (data == null) return null;
return new BigInteger(data);
}
private static final class DumpGroupConverter extends GroupConverter {
@Override public void start() { }
@Override public void end() { }
@Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
}
private static final class DumpConverter extends PrimitiveConverter {
@Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
}
}
@tispratik
Copy link
Author

This version of the reader throws an exception when reading some data of some columns.

Exception in thread "main" parquet.io.ParquetDecodingException: Can't read value in column [description] BINARY at value 44899 out of 57096, 44899 out of 57096 in currentPage. repetition level: 0, definition level: 1
    at parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:450)
    at parquet.column.impl.ColumnReaderImpl.getBinary(ColumnReaderImpl.java:398)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.load(RfiParquetFileReader.java:147)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.<init>(RfiParquetFileReader.java:87)
    at com.rocketfuel.grid.lookup_new.RfiParquetFileReader.main(RfiParquetFileReader.java:114)
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking stream.
    at parquet.Preconditions.checkArgument(Preconditions.java:47)
    at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
    at parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
    at parquet.column.values.dictionary.DictionaryValuesReader.readBytes(DictionaryValuesReader.java:82)
    at parquet.column.impl.ColumnReaderImpl$2$6.read(ColumnReaderImpl.java:295)
    at parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:446)
    ... 4 more

@tispratik
Copy link
Author

A similar issue was reported here https://issues.apache.org/jira/browse/DRILL-827

@dvryaboy
Copy link

how were the parquet files created? Any chance you can share them?

@tispratik
Copy link
Author

Found the problem, i was getting the current definitionLevel outside of the for loop for (long i = 0, totalValueCount.... I need to get the current definition level every time i consume().

definitionLevel = columnReader.getCurrentDefinitionLevel();

@tispratik
Copy link
Author

Updated the gist, now it should work. Btw, the parquet files were created through sqoop import of mysql.

@tispratik
Copy link
Author

This reader does not re-open the parquet file for every column read.

Copy link

ghost commented Nov 17, 2014

Hi Pratik,

i am using CDH5,
Could you please tell me the step by step procedure to insert the csv file into hive table as a parquet file formate.

thank you,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment