Skip to content

Instantly share code, notes, and snippets.

@sohangp
Last active October 2, 2019 12:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sohangp/a58c85dcbf66a7a6b16d705fd9bce176 to your computer and use it in GitHub Desktop.
Save sohangp/a58c85dcbf66a7a6b16d705fd9bce176 to your computer and use it in GitHub Desktop.
Start Debezium and Handle Events
...
@PostConstruct
void start() {
executor.execute(engine);
}
...
/*
* Connect to the PG database and invoke the method `handleEvent()` when an
* event has been polled.
*/
this.engine = EmbeddedEngine.create()
.using(debeziumConfig)
.notifying(this::handleEvent)
.build();
...
private void handleEvent(SourceRecord sourceRecord) {
Struct sourceRecordValue = (Struct) sourceRecord.value();
Envelope.Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION));
//Get only the Create's
if (operation == Operation.CREATE) {
Struct after = (Struct) sourceRecordValue.get(AFTER);
Map<String, Object> polledMessage = after.schema().fields().stream()
.map(Field::name)
.map(fieldName -> Pair.of(fieldName, after.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
//Publish Message to Kafka.
kafkaSink.sink(polledMessage);
}
}
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment