Created
September 27, 2020 14:08
-
-
Save sachnk/b35f1a162337fca013f888ac9ee2424d to your computer and use it in GitHub Desktop.
EventConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.clearstreet.debezium; | |
import io.confluent.connect.avro.AvroConverterConfig; | |
import io.confluent.connect.avro.AvroData; | |
import io.confluent.connect.avro.AvroDataConfig; | |
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; | |
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | |
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; | |
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.Schema.Field; | |
import org.apache.avro.io.Decoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.kafka.common.errors.SerializationException; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaAndValue; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.errors.DataException; | |
import org.apache.kafka.connect.storage.Converter; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.Map; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; | |
public class EventConverter implements Converter { | |
private final Logger logger = LoggerFactory.getLogger(getClass()); | |
private SchemaRegistryClient schemaRegistry; | |
private EventConverter.Serializer serializer; | |
private AvroData avroData; | |
public EventConverter() { | |
} | |
public EventConverter(SchemaRegistryClient client) { | |
this.schemaRegistry = client; | |
} | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
AvroConverterConfig avroConverterConfig = new AvroConverterConfig(configs); | |
if (this.schemaRegistry == null) { | |
this.schemaRegistry = new CachedSchemaRegistryClient(avroConverterConfig.getSchemaRegistryUrls(), avroConverterConfig.getMaxSchemasPerSubject(), configs); | |
} | |
this.serializer = new EventConverter.Serializer(this.schemaRegistry); | |
this.avroData = new AvroData(new AvroDataConfig(configs)); | |
} | |
public byte[] fromConnectData(String topic, Schema schema, Object record) { | |
final Struct value = requireStruct(record, "lookup"); | |
long id = value.getInt64("id"); | |
logger.info("EventConverter fromConnectData: id={}, {}, {}", id, topic, record.getClass().getName()); | |
// if schemaID isn't an int, we'll throw here | |
int schemaID = value.getInt32("schema_id"); | |
org.apache.avro.Schema avroSchema = null; | |
try { | |
avroSchema = schemaRegistry.getById(schemaID); | |
} catch(IOException e) { | |
logger.error("schemaRegistry getById failed: {}", e.getMessage()); | |
throw new RuntimeException(String.format("schemaRegistry getByID failed for id %s", schemaID), e); | |
} catch(RestClientException e) { | |
logger.error("schemaRegistry restClientException: {}", e.getMessage()); | |
throw new RuntimeException(String.format("schemaRegistry restClientException for id %s", schemaID), e); | |
} | |
if (avroSchema == null) { | |
logger.error("unable to find schema {} in registry!", schemaID); | |
throw new RuntimeException(String.format("unable to find schema %s in registry", schemaID)); | |
} | |
byte[] payload = value.getBytes("data"); | |
SpecificDatumReader<GenericRecord> reader = new SpecificDatumReader<>(avroSchema); | |
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null); | |
GenericRecord avroObject = null; | |
try { | |
avroObject = reader.read(null, decoder); | |
} catch(IOException e) { | |
logger.error("failed parsing avro message: {}", e.getMessage()); | |
throw new RuntimeException(String.format("failed parsing avro message for id %s and schema id %s", id, schemaID), e); | |
} | |
try { | |
return this.serializer.serialize(avroSchema.getFullName(), avroObject); | |
} catch (SerializationException var5) { | |
throw new DataException(String.format(topic), var5); | |
} | |
} | |
public SchemaAndValue toConnectData(String topic, byte[] value) { | |
throw new RuntimeException("converter only supports serialization"); | |
} | |
private static class Serializer extends AbstractKafkaAvroSerializer { | |
public Serializer(SchemaRegistryClient client) { | |
this.schemaRegistry = client; | |
this.autoRegisterSchema = false; | |
} | |
public byte[] serialize(String subject, Object value) { | |
return this.serializeImpl(subject, value); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment