Created January 17, 2018 13:45
Use reflection to write arbitrary java beans to parquet with Avro

This code shows how to use reflection to write arbitrary java beans to parquet files with Apache Avro.



ParquetWriterHelper<BeanClass> writer = new ParquetWriterHelper<>(BeanClass.class);
Iterable<List<BeanClass>> batches = Iterables.partition(beans, 300_000);

int cnt = 0;

for (List<BeanClass> batch : batches) {
    String name = String.format("part-%05d.snappy.parquet", cnt);
    writer.write(batch, name);

Dependencies to add to pom.xml:

package avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import java.util.ArrayList;
import java.util.List;
public class BeanToRecordConverter<E> {
private final PropertyExtractors extractors = new PropertyExtractors();
private final Class<?> type;
private final Schema schema;
public BeanToRecordConverter(Class<E> type) {
this.type = type;
this.schema = ReflectData.get().getSchema(type);
public BeanToRecordConverter(Class<E> type, Schema schema) {
this.type = type;
this.schema = schema;
public GenericRecord convert(E bean) {
try {
return convertBeanToRecord(bean, schema);
} catch (Exception e) {
throw new RuntimeException(e);
private GenericRecord convertBeanToRecord(Object bean, Schema schema) throws Exception {
Class<?> beanClass = bean.getClass();
PropertyExtractors.PropertyExtractor extractor = extractors.getOrCreate(beanClass);
GenericRecord result = new GenericData.Record(schema);
List<Schema.Field> fields = schema.getFields();
for (Schema.Field field : fields) {
Schema fieldSchema = field.schema();
Schema.Type type = fieldSchema.getType();
String name =;
Object value = extractor.extract(bean, name);
if (isSimpleType(type)) {
result.put(name, value);
if (type.equals(Schema.Type.RECORD)) {
GenericRecord fieldRes = convertBeanToRecord(value, fieldSchema);
result.put(name, fieldRes);
if (type.equals(Schema.Type.ARRAY)) {
// let's assume it's always list
List<Object> elements = (List<Object>) value;
Schema elementSchema = fieldSchema.getElementType();
if (isSimpleType(elementSchema.getType())) {
result.put(name, elements);
List<GenericRecord> results = new ArrayList<>(elements.size());
for (Object element : elements) {
GenericRecord elementRes = convertBeanToRecord(element, elementSchema);
result.put(name, results);
return result;
public static boolean isSimpleType(Schema.Type type) {
if (type.equals(Schema.Type.STRING)) {
return true;
if (type.equals(Schema.Type.INT)) {
return true;
if (type.equals(Schema.Type.LONG)) {
return true;
return false;
package avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
public class ParquetWriterHelper<E> {
private static final int BLOCK_SIZE = 1024 * 1024 * 8;
private static final int PAGE_SIZE = BLOCK_SIZE * 100;
private final Schema schema;
private final BeanToRecordConverter<E> converter;
private final MessageType parquetSchema;
private final AvroWriteSupport<GenericRecord> support;
public ParquetWriterHelper(Class<E> cls) {
this.schema = ReflectData.get().getSchema(cls);
this.converter = new BeanToRecordConverter<>(cls, schema);
this.parquetSchema = new AvroSchemaConverter().convert(schema); = new AvroWriteSupport<>(parquetSchema, schema);
public void write(Iterable<E> objects, String file) throws IOException {
Path path = new Path(file);
ParquetWriter<GenericRecord> pw = new ParquetWriter<>(path, support,
CompressionCodecName.SNAPPY, BLOCK_SIZE, PAGE_SIZE);
for (E e : objects) {
GenericRecord rec = converter.convert(e);
package avro;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class PropertyExtractors {
public static class PropertyExtractor {
private final Class<?> type;
private final Map<String, Method> getters;
private PropertyExtractor(Class<?> type, Map<String, Method> getters) {
this.type = type;
this.getters = getters;
public Object extract(Object bean, String propertyName) {
try {
return getters.get(propertyName).invoke(bean);
} catch (Exception e) {
throw new RuntimeException(e);
public Class<?> getType() {
return type;
private final Map<Class<?>, PropertyExtractor> cache = new HashMap<>();
public PropertyExtractor getOrCreate(Class<?> type) {
if (cache.containsKey(type)) {
return cache.get(type);
PropertyExtractor extractor = forClass(type);
cache.put(type, extractor);
return extractor;
public static PropertyExtractor forClass(Class<?> type) {
try {
return forClassNotSafe(type);
} catch (IntrospectionException e) {
throw new RuntimeException(e);
private static PropertyExtractor forClassNotSafe(Class<?> type) throws IntrospectionException {
BeanInfo info = Introspector.getBeanInfo(type);
PropertyDescriptor[] properties = info.getPropertyDescriptors();
Map<String, Method> getters = new HashMap<>();
for (PropertyDescriptor pd : properties) {
String name = pd.getName();
if ("class".equals(name)) {
Method getter = pd.getReadMethod();
if (getter == null) {
getters.put(name, getter);
return new PropertyExtractor(type, getters);
The above issue comes while reading the parquet file which was serialized before.

Getting below error for LocalDate field

org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group

