Skip to content

Instantly share code, notes, and snippets.

@alexeygrigorev
Created January 17, 2018 13:45
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alexeygrigorev/eab72e40c6051e0163a6693054906d66 to your computer and use it in GitHub Desktop.
Save alexeygrigorev/eab72e40c6051e0163a6693054906d66 to your computer and use it in GitHub Desktop.
Use reflection to write arbitrary java beans to parquet with Avro

This code shows how to use reflection to write arbitrary java beans to parquet files with Apache Avro.

Example:

import com.google.common.collect.Iterables;


ParquetWriterHelper<BeanClass> writer = new ParquetWriterHelper<>(BeanClass.class);
Iterable<List<BeanClass>> batches = Iterables.partition(beans, 300_000);

int cnt = 0;

for (List<BeanClass> batch : batches) {
    String name = String.format("part-%05d.snappy.parquet", cnt);
    writer.write(batch, name);
    cnt++;
}

Dependencies to add to pom.xml:

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.8.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>1.1.0</version>
</dependency>
package avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import java.util.ArrayList;
import java.util.List;
public class BeanToRecordConverter<E> {
private final PropertyExtractors extractors = new PropertyExtractors();
private final Class<?> type;
private final Schema schema;
public BeanToRecordConverter(Class<E> type) {
this.type = type;
this.schema = ReflectData.get().getSchema(type);
}
public BeanToRecordConverter(Class<E> type, Schema schema) {
this.type = type;
this.schema = schema;
}
public GenericRecord convert(E bean) {
try {
return convertBeanToRecord(bean, schema);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private GenericRecord convertBeanToRecord(Object bean, Schema schema) throws Exception {
Class<?> beanClass = bean.getClass();
PropertyExtractors.PropertyExtractor extractor = extractors.getOrCreate(beanClass);
GenericRecord result = new GenericData.Record(schema);
List<Schema.Field> fields = schema.getFields();
for (Schema.Field field : fields) {
Schema fieldSchema = field.schema();
Schema.Type type = fieldSchema.getType();
String name = field.name();
Object value = extractor.extract(bean, name);
if (isSimpleType(type)) {
result.put(name, value);
continue;
}
if (type.equals(Schema.Type.RECORD)) {
GenericRecord fieldRes = convertBeanToRecord(value, fieldSchema);
result.put(name, fieldRes);
continue;
}
if (type.equals(Schema.Type.ARRAY)) {
// let's assume it's always list
List<Object> elements = (List<Object>) value;
Schema elementSchema = fieldSchema.getElementType();
if (isSimpleType(elementSchema.getType())) {
result.put(name, elements);
continue;
}
List<GenericRecord> results = new ArrayList<>(elements.size());
for (Object element : elements) {
GenericRecord elementRes = convertBeanToRecord(element, elementSchema);
results.add(elementRes);
}
result.put(name, results);
continue;
}
}
return result;
}
public static boolean isSimpleType(Schema.Type type) {
if (type.equals(Schema.Type.STRING)) {
return true;
}
if (type.equals(Schema.Type.INT)) {
return true;
}
if (type.equals(Schema.Type.LONG)) {
return true;
}
return false;
}
}
package avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
public class ParquetWriterHelper<E> {
private static final int BLOCK_SIZE = 1024 * 1024 * 8;
private static final int PAGE_SIZE = BLOCK_SIZE * 100;
private final Schema schema;
private final BeanToRecordConverter<E> converter;
private final MessageType parquetSchema;
private final AvroWriteSupport<GenericRecord> support;
public ParquetWriterHelper(Class<E> cls) {
this.schema = ReflectData.get().getSchema(cls);
this.converter = new BeanToRecordConverter<>(cls, schema);
this.parquetSchema = new AvroSchemaConverter().convert(schema);
this.support = new AvroWriteSupport<>(parquetSchema, schema);
}
public void write(Iterable<E> objects, String file) throws IOException {
Path path = new Path(file);
ParquetWriter<GenericRecord> pw = new ParquetWriter<>(path, support,
CompressionCodecName.SNAPPY, BLOCK_SIZE, PAGE_SIZE);
for (E e : objects) {
GenericRecord rec = converter.convert(e);
pw.write(rec);
}
pw.close();
}
}
package avro;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class PropertyExtractors {
public static class PropertyExtractor {
private final Class<?> type;
private final Map<String, Method> getters;
private PropertyExtractor(Class<?> type, Map<String, Method> getters) {
this.type = type;
this.getters = getters;
}
public Object extract(Object bean, String propertyName) {
try {
return getters.get(propertyName).invoke(bean);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Class<?> getType() {
return type;
}
}
private final Map<Class<?>, PropertyExtractor> cache = new HashMap<>();
public PropertyExtractor getOrCreate(Class<?> type) {
if (cache.containsKey(type)) {
return cache.get(type);
}
PropertyExtractor extractor = forClass(type);
cache.put(type, extractor);
return extractor;
}
public static PropertyExtractor forClass(Class<?> type) {
try {
return forClassNotSafe(type);
} catch (IntrospectionException e) {
throw new RuntimeException(e);
}
}
private static PropertyExtractor forClassNotSafe(Class<?> type) throws IntrospectionException {
BeanInfo info = Introspector.getBeanInfo(type);
PropertyDescriptor[] properties = info.getPropertyDescriptors();
Map<String, Method> getters = new HashMap<>();
for (PropertyDescriptor pd : properties) {
String name = pd.getName();
if ("class".equals(name)) {
continue;
}
Method getter = pd.getReadMethod();
if (getter == null) {
continue;
}
getters.put(name, getter);
}
return new PropertyExtractor(type, getters);
}
}
@altre
Copy link

altre commented Sep 21, 2018

Hi, this is very handy and basically the thing I just wanted to write.
You do not seem to have a license for your gists, anywhere. Could you specify the license?

@leela-maheshwar
Copy link

org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/lrao/scanwork/a423448b-fc42-46b6-a0f6-88f10fcdb653/a423448b-fc42-46b6-a0f6-88f10fcdb653.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:129)

java.lang.ClassCastException: com.sherlock.dao.ScanResultsRow cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.setField(GenericData.java:569)
at org.apache.parquet.avro.AvroRecordConverter.set(AvroRecordConverter.java:295)
at org.apache.parquet.avro.AvroRecordConverter$1.add(AvroRecordConverter.java:109)
at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary(AvroConverters.java:62)
at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:129)

I get above errors. any idea what Im missing?

@leela-maheshwar
Copy link

The above issue comes while reading the parquet file which was serialized before.

@jabhijeet
Copy link

Getting below error for LocalDate field

org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment