Skip to content

Instantly share code, notes, and snippets.

@sachnk
sachnk / debezium_config.json
Created September 27, 2020 14:17
debezium_config.json
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": <registry-url>,
"value.converter": "io.clearstreet.debezium.EventConverter",
"value.converter.schema.registry.url": <registry-url>,
"transforms": "router,unwrap",
"transforms.router.type": "io.clearstreet.debezium.EventRouter",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": true,
@sachnk
sachnk / EventRouter.java
Created September 27, 2020 14:13
EventRouter.java
package io.clearstreet.debezium;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;
@sachnk
sachnk / EventConverter.java
Created September 27, 2020 14:08
EventConverter.java
package io.clearstreet.debezium;
import io.confluent.connect.avro.AvroConverterConfig;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.avro.generic.GenericData;
@sachnk
sachnk / debezium_router.java
Created September 27, 2020 14:01
debezium_router.java
package io.clearstreet.debezium;
// imports, error-handling, and exception-handling omitted for brevity
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String PURPOSE = "field extraction";
@Override
public R apply(R record) {
if (record.valueSchema() == null || record.value() == null) {
@sachnk
sachnk / debezium_payload.json
Created September 27, 2020 13:44
debezium_payload.json
"payload": {
"before": null,
"after": {
"id": 1,
"created_at": "2020-09-27T13:37:24.058468Z",
"column_a": "a",
"column_b": "b"
},
"source": {
"version": "0.9.5.Final",
@sachnk
sachnk / events.sql
Last active September 25, 2020 18:02
events.sql
-- postgres events table
CREATE TABLE myservice.events (
id bigserial NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
schema_id int4 NOT NULL,
partition int4 DEFAULT 0,
data bytea NOT NULL,
CONSTRAINT events_pk PRIMARY KEY (id)
);