Skip to content

Instantly share code, notes, and snippets.

@sohangp
Created October 2, 2019 17:57
Show Gist options
  • Save sohangp/661e651b1169ceed50e804f64649dd6d to your computer and use it in GitHub Desktop.
Save sohangp/661e651b1169ceed50e804f64649dd6d to your computer and use it in GitHub Desktop.
Custom Transformation on Debezium event
public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
/**
* This method is invoked when a change is made on the outbox schema.
*
* @param sourceRecord
* @return
*/
public R apply(R sourceRecord) {
Struct kStruct = (Struct) sourceRecord.value();
String databaseOperation = kStruct.getString("op");
//Handle only the Create's
if ("c".equalsIgnoreCase(databaseOperation)) {
// Get the details.
Struct after = (Struct) kStruct.get("after");
String UUID = after.getString("uuid");
String payload = after.getString("payload");
String eventType = after.getString("event_type").toLowerCase();
String topic = eventType.toLowerCase();
Headers headers = sourceRecord.headers();
headers.addString("eventId", UUID);
// Build the event to be published.
sourceRecord = sourceRecord.newRecord(topic, null, Schema.STRING_SCHEMA, UUID,
null, payload, sourceRecord.timestamp(), headers);
}
return sourceRecord;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment