Skip to content

Instantly share code, notes, and snippets.

@tispratik
Created August 28, 2014 19:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tispratik/f7a66f6a40b7ae3b98ad to your computer and use it in GitHub Desktop.
Save tispratik/f7a66f6a40b7ae3b98ad to your computer and use it in GitHub Desktop.
Parquet columns reader2
package com.company.grid.lookup_new;
import parquet.column.ColumnDescriptor;
import parquet.column.ColumnReader;
import parquet.column.impl.ColumnReadStoreImpl;
import parquet.column.page.PageReadStore;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
import parquet.io.api.Converter;
import parquet.io.api.GroupConverter;
import parquet.io.api.PrimitiveConverter;
import parquet.schema.MessageType;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
final class DimColumn {
public volatile Object arrayList;
public String name;
public Class<?> typeOfArray;
public int size;
DimColumn(String name, Class<?> typeOfArray, int size) {
this.name = name;
this.typeOfArray = typeOfArray;
this.size = size;
this.arrayList = Array.newInstance(typeOfArray, size);
}
}
public class RfiParquetFileReader {
ParquetMetadata metaData;
MessageType schema;
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final CharsetDecoder UTF8_DECODER = UTF8.newDecoder();
public RfiParquetFileReader(String fileName) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("fs.file.impl", LocalFileSystem.class.getName());
Path filePath = new Path(fileName);
metaData = ParquetFileReader.readFooter(conf, filePath);
schema = metaData.getFileMetaData().getSchema();
List<BlockMetaData> blocks;
blocks = metaData.getBlocks();
long totalSize = blocks.get(0).getRowCount();
List<ColumnDescriptor> columns = schema.getColumns();
List<DimColumn> dimColumns = new ArrayList<DimColumn>();
for (ColumnDescriptor columnDescriptor : columns) {
System.out.println(columnDescriptor.toString());
DimColumn dimColumn = new DimColumn(columnDescriptor.getPath()[0], columnDescriptor.getType().javaType, (int) totalSize);
int index = 0;
ParquetFileReader fileReader = new ParquetFileReader(conf, filePath, blocks, schema.getColumns());
PageReadStore pageReadStore = fileReader.readNextRowGroup();
while (pageReadStore != null) {
ColumnReadStoreImpl columnReadStoreImpl = new ColumnReadStoreImpl(pageReadStore, new DumpGroupConverter(), schema);
index = load(columnReadStoreImpl, columnDescriptor, dimColumn, index);
pageReadStore = fileReader.readNextRowGroup();
}
dimColumns.add(dimColumn);
}
Random rand = new Random();
int index;
for (DimColumn dimColumn : dimColumns) {
System.out.println(dimColumn.name);
for(int i = 0; i < 5; i++) {
index = rand.nextInt((int) totalSize);
System.out.println("Index: " + index + " Value: " + Array.get(dimColumn.arrayList, index));
}
System.out.println("--------");
}
}
public String getSchema() {
return schema.toString();
}
public static void main(String[] args) {
String fileName = "/Users/pkhadloya/Downloads/AVRO/parquet_files/13.assigned_conversion.parquet";
try {
long startTime = System.currentTimeMillis();
new RfiParquetFileReader(fileName);
long endTime = System.currentTimeMillis();
System.out.println("Time taken: " + (endTime - startTime) + "ms");
} catch (IOException e) {
e.printStackTrace();
}
}
public static int load(ColumnReadStoreImpl columnReadStore, ColumnDescriptor column, DimColumn dimColumn, int index) throws IOException {
int maxDefinitionLevel = column.getMaxDefinitionLevel();
ColumnReader columnReader = columnReadStore.getColumnReader(column);
for (long i = 0, totalValueCount = columnReader.getTotalValueCount(); i < totalValueCount; ++i) {
int definitionLevel = columnReader.getCurrentDefinitionLevel();
if (definitionLevel == maxDefinitionLevel) {
switch (column.getType()) {
case BINARY: Array.set(dimColumn.arrayList, index, columnReader.getBinary()); break;
case BOOLEAN: Array.set(dimColumn.arrayList, index, columnReader.getBoolean()); break;
case DOUBLE: Array.set(dimColumn.arrayList, index, columnReader.getDouble()); break;
case FLOAT: Array.set(dimColumn.arrayList, index, columnReader.getFloat()); break;
case INT32: Array.set(dimColumn.arrayList, index, columnReader.getInteger()); break;
case INT64: Array.set(dimColumn.arrayList, index, columnReader.getLong()); break;
case INT96: Array.set(dimColumn.arrayList, index, binaryToBigInteger(columnReader.getBinary())); break;
// case FIXED_LEN_BYTE_ARRAY: out.format("%s", binaryToString(columnReader.getBinary())); break;
}
}
columnReader.consume();
index += 1;
}
return index;
}
// public static String binaryToString(Binary value) {
// byte[] data = value.getBytes();
// if (data == null) return null;
//
// try {
// CharBuffer buffer = UTF8_DECODER.decode(value.toByteBuffer());
// return buffer.toString();
// } catch (Throwable th) {
// }
//
// return "";
// }
public static BigInteger binaryToBigInteger(Binary value) {
byte[] data = value.getBytes();
if (data == null) return null;
return new BigInteger(data);
}
private static final class DumpGroupConverter extends GroupConverter {
@Override public void start() { }
@Override public void end() { }
@Override public Converter getConverter(int fieldIndex) { return new DumpConverter(); }
}
private static final class DumpConverter extends PrimitiveConverter {
@Override public GroupConverter asGroupConverter() { return new DumpGroupConverter(); }
}
}
@tispratik
Copy link
Author

This reader re-opens the parquet file every time it reads a new column. An optimized version is https://gist.github.com/tispratik/f0044dd84dc8d8c6cbcf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment