Skip to content

Instantly share code, notes, and snippets.

@animeshtrivedi
Last active March 8, 2022 12:04
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save animeshtrivedi/76de64f9dab1453958e1d4f8eca1605f to your computer and use it in GitHub Desktop.
Save animeshtrivedi/76de64f9dab1453958e1d4f8eca1605f to your computer and use it in GitHub Desktop.
Example program to convert Apache Parquet data to Apache Arrow
/* This code snippet is a part of the blog at
https://github.com/animeshtrivedi/blog/blob/master/post/2017-12-26-arrow.md
*/
import com.google.common.collect.ImmutableList;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.util.List;
/**
* Created by atr on 19.12.17.
*/
private class DumpConverter extends PrimitiveConverter {
final override def asGroupConverter = new DumpGroupConverter
}
private class DumpGroupConverter extends GroupConverter {
final def start() {}
final def end() {}
final def getConverter(fieldIndex: Int) = new DumpConverter
}
public class ParquetToArrow {
private Configuration conf;
private MessageType parquetSchema;
private ParquetFileReader parquetFileReader;
private ParquetMetadata parquetFooter;
private Path arrowPath;
private Schema arrowSchema;
private VectorSchemaRoot arrowVectorSchemaRoot;
private ArrowFileWriter arrowFileWriter;
private RootAllocator ra = null;
public ParquetToArrow(){
this.conf = new Configuration();
this.ra = new RootAllocator(Integer.MAX_VALUE);
this.arrowPath = new Path("/arrowOutput/");
}
public void setParquetInputFile(String parquetFile) throws Exception {
Path parqutFilePath = new Path(parquetFile);
this.parquetFooter = ParquetFileReader.readFooter(conf,
parqutFilePath,
ParquetMetadataConverter.NO_FILTER);
FileMetaData mdata = this.parquetFooter.getFileMetaData();
this.parquetSchema = mdata.getSchema();
this.parquetFileReader = new ParquetFileReader(conf,
mdata,
parqutFilePath,
this.parquetFooter.getBlocks(),
this.parquetSchema.getColumns());
makeArrowSchema();
setArrowFileWriter(convertParquetToArrowFileName(parqutFilePath));
}
private String convertParquetToArrowFileName(Path parquetNamePath){
String oldsuffix = ".parquet";
String newSuffix = ".arrow";
String fileName = parquetNamePath.getName();
if (!fileName.endsWith(oldsuffix)) {
return fileName + newSuffix;
}
return fileName.substring(0, fileName.length() - oldsuffix.length()) + newSuffix;
}
private void makeArrowSchema() throws Exception {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
StringBuilder sb = new StringBuilder();
for(ColumnDescriptor col: this.parquetSchema.getColumns()){
sb.setLength(0);
String[] p = col.getPath();
for(String px: p)
sb.append(px);
switch (col.getType()) {
case INT32 :
childrenBuilder.add(new Field(sb.toString(),
FieldType.nullable(new ArrowType.Int(32, true)), null));
break;
case INT64 :
childrenBuilder.add(new Field(sb.toString(),
FieldType.nullable(new ArrowType.Int(64, true)), null));
break;
case DOUBLE :
childrenBuilder.add(new Field(sb.toString(),
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null));
break;
case BINARY :
childrenBuilder.add(new Field(sb.toString(),
FieldType.nullable(new ArrowType.Binary()), null));
break;
// has float
//case FLOAT:
default : throw new Exception(" NYI " + col.getType());
}
}
this.arrowSchema = new Schema(childrenBuilder.build(), null);
//System.out.println("Arrow Schema is " + this.arrowSchema.toString());
}
private void setArrowFileWriter(String arrowFileName) throws Exception{
String arrowFullPath = this.arrowPath.toUri().toString() + "/" + arrowFileName;
System.out.println("Creating a file with name : " + arrowFullPath);
// create the file stream on HDFS
Path path = new Path(arrowFullPath);
FileSystem fs = FileSystem.get(path.toUri(), conf);
// default is to over-write
FSDataOutputStream file = fs.create(new Path(path.toUri().getRawPath()));
this.arrowVectorSchemaRoot = VectorSchemaRoot.create(this.arrowSchema, this.ra);
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
boolean writeLocalFile = false;
if(writeLocalFile) {
File arrowFile = new File("./" + arrowFileName);
FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
this.arrowFileWriter = new ArrowFileWriter(this.arrowVectorSchemaRoot,
provider,
fileOutputStream.getChannel());
} else {
/* use HDFS files */
this.arrowFileWriter = new ArrowFileWriter(this.arrowVectorSchemaRoot,
provider,
new HDFSWritableByteChannel(file));
}
}
public void process() throws Exception {
PageReadStore pageReadStore = null;
List<ColumnDescriptor> colDesc = parquetSchema.getColumns();
List<FieldVector> fieldVectors = this.arrowVectorSchemaRoot.getFieldVectors();
int size = colDesc.size();
DumpGroupConverter conv = new DumpGroupConverter();
this.arrowFileWriter.start();
pageReadStore = parquetFileReader.readNextRowGroup();
while (pageReadStore != null) {
ColumnReadStoreImpl colReader = new ColumnReadStoreImpl(pageReadStore, conv,
this.parquetSchema, this.parquetFooter.getFileMetaData().getCreatedBy());
if(pageReadStore.getRowCount() > Integer.MAX_VALUE)
throw new Exception(" More than Integer.MAX_VALUE is not supported " + pageReadStore.getRowCount());
int rows = (int) pageReadStore.getRowCount();
// this batch of Arrow contains these many records
this.arrowVectorSchemaRoot.setRowCount(rows);
int i = 0;
while (i < size){
ColumnDescriptor col = colDesc.get(i);
switch(col.getType()) {
case INT32: writeIntColumn(colReader.getColumnReader(col),
col.getMaxDefinitionLevel(),
fieldVectors.get(i),
rows);
break;
case INT64: writeLongColumn(colReader.getColumnReader(col),
col.getMaxDefinitionLevel(),
fieldVectors.get(i),
rows);
break;
case DOUBLE: writeDoubleColumn(colReader.getColumnReader(col),
col.getMaxDefinitionLevel(),
fieldVectors.get(i),
rows);
break;
case BINARY: writeBinaryColumn(colReader.getColumnReader(col),
col.getMaxDefinitionLevel(),
fieldVectors.get(i),
rows);
break;
default : throw new Exception(" NYI " + col.getType());
}
i+=1;
}
pageReadStore = parquetFileReader.readNextRowGroup();
this.arrowFileWriter.writeBatch();
}
this.arrowFileWriter.end();
this.arrowFileWriter.close();
}
private void writeIntColumn(ColumnReader creader, int dmax, FieldVector fieldVector, int rows) throws Exception {
IntVector intVector = (IntVector) fieldVector;
intVector.setInitialCapacity(rows);
intVector.allocateNew();
for(int i = 0; i < rows; i++) {
if(creader.getCurrentDefinitionLevel() == dmax){
intVector.setSafe(i, 1, creader.getInteger());
} else {
intVector.setNull(i);
}
creader.consume();
}
intVector.setValueCount(rows);
}
private void writeLongColumn(ColumnReader creader, int dmax, FieldVector fieldVector, int rows) throws Exception {
BigIntVector bigIntVector = (BigIntVector) fieldVector;
bigIntVector.setInitialCapacity(rows);
bigIntVector.allocateNew();
for(int i = 0; i < rows; i++) {
if(creader.getCurrentDefinitionLevel() == dmax){
bigIntVector.setSafe(i, 1, creader.getLong());
} else {
bigIntVector.setNull(i);
}
creader.consume();
}
bigIntVector.setValueCount(rows);
}
private void writeDoubleColumn(ColumnReader creader, int dmax, FieldVector fieldVector, int rows) throws Exception {
Float8Vector float8Vector = (Float8Vector ) fieldVector;
float8Vector.setInitialCapacity((int) rows);
float8Vector.allocateNew();
for(int i = 0; i < rows; i++) {
if(creader.getCurrentDefinitionLevel() == dmax){
float8Vector.setSafe(i, 1, creader.getDouble());
} else {
float8Vector.setNull(i);
}
creader.consume();
}
float8Vector.setValueCount(rows);
}
private void writeBinaryColumn(ColumnReader creader, int dmax, FieldVector fieldVector, int rows) throws Exception {
VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
varBinaryVector.setInitialCapacity((int) rows);
varBinaryVector.allocateNew();
for(int i = 0; i < rows; i++) {
if(creader.getCurrentDefinitionLevel() == dmax){
byte[] data = creader.getBinary().getBytes();
varBinaryVector.setIndexDefined(i);
varBinaryVector.setValueLengthSafe(i, data.length);
varBinaryVector.setSafe(i, data);
} else {
varBinaryVector.setNull(i);
}
creader.consume();
}
varBinaryVector.setValueCount(rows);
}
}
@dsomesh
Copy link

dsomesh commented Mar 7, 2022

@animeshtrivedi thanks for the great example. One question though, in the method "setArrowFileWriter", the way a local file writer is created, old contents of the file will be overridden. What change needs to be done so that append is also successful? In my local testing, I tried creating FileOutputStream like FileOutputStream(FileName, true) but it did not seem to work.

@animeshtrivedi
Copy link
Author

hi @dsomesh - i dont think that i have tried appending to an Arrow file. That would be a bit more complicated as you must have to make sure that schema matches, and dictionary and other metadata is properly setup when appending a new data stream to an existing Arrow file. I am sure it can be done, but i do not know from the top of my head how to set it up. It is definitely not as simple as just appending to an file. I would recommend (i) checking in the source code examples on how to initialize an Arrow file writer class ArrowFileWriter with an existing data file, and schema. For example may be you can open an existing file to read in ArrowReader and then initialize the writer from values from the reader; and/or (ii) ask on Arrow user mailing list. Sorry I dont have a clear answer for you here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment