Skip to content

Instantly share code, notes, and snippets.

@sachnk
Created September 27, 2020 14:08
Show Gist options
  • Save sachnk/b35f1a162337fca013f888ac9ee2424d to your computer and use it in GitHub Desktop.
Save sachnk/b35f1a162337fca013f888ac9ee2424d to your computer and use it in GitHub Desktop.
EventConverter.java
package io.clearstreet.debezium;
import io.confluent.connect.avro.AvroConverterConfig;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public class EventConverter implements Converter {
private final Logger logger = LoggerFactory.getLogger(getClass());
private SchemaRegistryClient schemaRegistry;
private EventConverter.Serializer serializer;
private AvroData avroData;
public EventConverter() {
}
public EventConverter(SchemaRegistryClient client) {
this.schemaRegistry = client;
}
public void configure(Map<String, ?> configs, boolean isKey) {
AvroConverterConfig avroConverterConfig = new AvroConverterConfig(configs);
if (this.schemaRegistry == null) {
this.schemaRegistry = new CachedSchemaRegistryClient(avroConverterConfig.getSchemaRegistryUrls(), avroConverterConfig.getMaxSchemasPerSubject(), configs);
}
this.serializer = new EventConverter.Serializer(this.schemaRegistry);
this.avroData = new AvroData(new AvroDataConfig(configs));
}
public byte[] fromConnectData(String topic, Schema schema, Object record) {
final Struct value = requireStruct(record, "lookup");
long id = value.getInt64("id");
logger.info("EventConverter fromConnectData: id={}, {}, {}", id, topic, record.getClass().getName());
// if schemaID isn't an int, we'll throw here
int schemaID = value.getInt32("schema_id");
org.apache.avro.Schema avroSchema = null;
try {
avroSchema = schemaRegistry.getById(schemaID);
} catch(IOException e) {
logger.error("schemaRegistry getById failed: {}", e.getMessage());
throw new RuntimeException(String.format("schemaRegistry getByID failed for id %s", schemaID), e);
} catch(RestClientException e) {
logger.error("schemaRegistry restClientException: {}", e.getMessage());
throw new RuntimeException(String.format("schemaRegistry restClientException for id %s", schemaID), e);
}
if (avroSchema == null) {
logger.error("unable to find schema {} in registry!", schemaID);
throw new RuntimeException(String.format("unable to find schema %s in registry", schemaID));
}
byte[] payload = value.getBytes("data");
SpecificDatumReader<GenericRecord> reader = new SpecificDatumReader<>(avroSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
GenericRecord avroObject = null;
try {
avroObject = reader.read(null, decoder);
} catch(IOException e) {
logger.error("failed parsing avro message: {}", e.getMessage());
throw new RuntimeException(String.format("failed parsing avro message for id %s and schema id %s", id, schemaID), e);
}
try {
return this.serializer.serialize(avroSchema.getFullName(), avroObject);
} catch (SerializationException var5) {
throw new DataException(String.format(topic), var5);
}
}
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("converter only supports serialization");
}
private static class Serializer extends AbstractKafkaAvroSerializer {
public Serializer(SchemaRegistryClient client) {
this.schemaRegistry = client;
this.autoRegisterSchema = false;
}
public byte[] serialize(String subject, Object value) {
return this.serializeImpl(subject, value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment