Skip to content

Instantly share code, notes, and snippets.

@Chuckame
Created January 18, 2024 17:12
Show Gist options
  • Save Chuckame/f08e33d9f45fb47f6f431ccf1872d755 to your computer and use it in GitHub Desktop.
Save Chuckame/f08e33d9f45fb47f6f431ccf1872d755 to your computer and use it in GitHub Desktop.
kafka avro serializer & deserializer using jackson-avro-module, and compatible with the confluent avro serializers, useful when we don't want to generate classes using old plugins and only rely on java classes
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import lombok.SneakyThrows;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Map;
public class JacksonConfluentAvroDeserializer<T> extends AbstractKafkaAvroDeserializer implements Deserializer<T> {
private static final int MAGIC_BYTE = 0x0;
protected final ObjectReader avroReader;
protected final AvroSchema readerSchema;
protected boolean isKey;
@SneakyThrows
public JacksonConfluentAvroDeserializer(AvroMapper avroMapper, Class<T> serializedType) {
this.avroReader = avroMapper.readerFor(serializedType);
this.readerSchema = avroMapper.schemaFor(serializedType);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(new KafkaAvroDeserializerConfig(configs));
this.isKey = isKey;
}
@Override
@SneakyThrows
public T deserialize(String topic, byte[] data) {
if (data == null) return null;
final var input = new ByteArrayInputStream(data);
if (input.read() != MAGIC_BYTE) {
throw new SerializationException("Missing magic byte");
}
String subject = getSubjectName(topic, isKey, data, new io.confluent.kafka.schemaregistry.avro.AvroSchema(readerSchema.getAvroSchema()));
final var writerSchemaId = getInt(data, 1);
Schema writerRawSchema = (Schema) schemaRegistry.getSchemaBySubjectAndId(subject, writerSchemaId).rawSchema();
final var schema = new AvroSchema(writerRawSchema).withReaderSchema(readerSchema);
return avroReader.with(schema).readValue(input);
}
private int getInt(byte[] bytes, int offset) {
return ByteBuffer.wrap(bytes, offset, Integer.BYTES).getInt();
}
@Override
@SneakyThrows
public void close() {
super.close();
}
}
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import lombok.SneakyThrows;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
public class JacksonConfluentAvroSerializer<T> extends AbstractKafkaAvroSerializer implements Serializer<T> {
private static final int MAGIC_BYTE = 0x0;
protected final ObjectWriter avroWriter;
protected final ParsedSchema writerSchema;
protected boolean isKey;
@SneakyThrows
public JacksonConfluentAvroSerializer(AvroMapper avroMapper, Class<T> serializedType) {
var writerSchema = avroMapper.schemaFor(serializedType);
this.writerSchema = new AvroSchema(writerSchema.getAvroSchema());
this.avroWriter = avroMapper.writerFor(serializedType).with(writerSchema);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(new KafkaAvroSerializerConfig(configs));
this.isKey = isKey;
}
@Override
@SneakyThrows
public byte[] serialize(String topic, T data) {
var id = retrieveSchemaId(topic, data);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(getIntBytes(id));
avroWriter.writeValue(out, data);
return out.toByteArray();
}
private static byte[] getIntBytes(int id) {
return ByteBuffer.allocate(Integer.BYTES).putInt(id).array();
}
@SneakyThrows
protected int retrieveSchemaId(String topic, T data) {
String subject = getSubjectName(topic, isKey, data, writerSchema);
if (autoRegisterSchema) {
return register(subject, writerSchema, normalizeSchema);
} else {
return schemaRegistry.getId(subject, writerSchema);
}
}
@Override
@SneakyThrows
public void close() {
super.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment