Skip to content

Instantly share code, notes, and snippets.

@cerisier
Last active November 18, 2022 21:42
Show Gist options
  • Save cerisier/9748d493bbcc301384aec52216601cb8 to your computer and use it in GitHub Desktop.
Save cerisier/9748d493bbcc301384aec52216601cb8 to your computer and use it in GitHub Desktop.
Hackish example of writing protobuf DynamicMessage objects to parquet via Avro using AvroParquetWriter.
package org.example
import com.google.protobuf.Descriptors
import com.google.protobuf.DynamicMessage
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.protobuf.ProtobufData
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.example.proto.MyProtobufMessageProto.MyProtobufMessage
// Custom FieldOptions specifying which parquet logical types to apply upon schema transformation
import org.example.proto.Transform
import org.example.proto.Transform.ParquetAnnotation
class DynamicProtobufData: ProtobufData() {
override fun getField(r: Any?, name: String?, pos: Int): Any? {
return getField(r, name, pos, getRecordState(r, getSchema((r as DynamicMessage).descriptorForType)))
}
override fun setField(r: Any?, n: String?, pos: Int, value: Any?) {
setField(r, n, pos, value, getRecordState(r, getSchema((r as DynamicMessage).descriptorForType)))
}
override fun getSchema(f: Descriptors.FieldDescriptor): Schema {
val schema = super.getSchema(f)
if (f.options.hasExtension(Transform.parquet) &&
f.options.getExtension(Transform.parquet).type == ParquetAnnotation.Annotation.TIMESTAMP) {
LogicalTypes.timestampMillis().addToSchema(schema)
}
return schema
}
}
fun main() {
// Descriptor could be parsed from a DescriptorProto as well
val descriptor = MyProtobufMessage.getDescriptor()
// Proto message could be bytes array directly
val protoBytes = MyProtobufMessage.newBuilder().setCreatedAt(0xdeadbeef).build().toByteArray()
val message = DynamicMessage.parseFrom(
descriptor,
protoBytes
)
val model = DynamicProtobufData()
val schema = model.getSchema(message.descriptorForType)
println(schema)
val writer: ParquetWriter<DynamicMessage> =
AvroParquetWriter.builder<DynamicMessage>(Path("avro.parquet"))
.withDataModel(model) // use the protobuf data model
.withSchema(schema) // Avro schema for the protobuf data
.build()
writer.write(message)
writer.close()
}
syntax = "proto3";
package org.example.proto;
option java_package = "org.example.proto";
option java_outer_classname = "MyProtobufMessageProto";
import "transform.proto";
message MyProtobufMessage {
int64 created_at = 1 [(transform) = { type: TIMESTAMP }];
}
syntax = "proto3";
package org.example.proto;
import "google/protobuf/descriptor.proto";
message ParquetAnnotation {
enum Annotation {
EMPTY = 0;
TIMESTAMP = 1;
}
Annotation type = 1;
}
extend google.protobuf.FieldOptions {
ParquetAnnotation parquet = 91100;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment