Skip to content

Instantly share code, notes, and snippets.

@sachnk
Created September 27, 2020 14:01
Show Gist options
  • Save sachnk/3c22453c34c88b0181bbec20fb591341 to your computer and use it in GitHub Desktop.
Save sachnk/3c22453c34c88b0181bbec20fb591341 to your computer and use it in GitHub Desktop.
debezium_router.java
package io.clearstreet.debezium;
// imports, error-handling, and exception-handling omitted for brevity
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String PURPOSE = "field extraction";
@Override
public R apply(R record) {
if (record.valueSchema() == null || record.value() == null) {
return record;
}
final Struct after = requireStructOrNull(record.value(), PURPOSE);
if (after == null) {
return record;
}
final Struct value = requireStructOrNull(after.getStruct("after"), PURPOSE);
if (value == null) {
return record;
}
int partition = value.getInt32("partition");
// if partition is not 0, then create copy of the record with the new partition
if (partition != 0) {
return record.newRecord(
record.topic(),
partition,
record.keySchema(),
record.key(),
record.valueSchema(),
record.value(),
record.timestamp()
);
}
return record;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment