Created
September 27, 2020 14:01
-
-
Save sachnk/3c22453c34c88b0181bbec20fb591341 to your computer and use it in GitHub Desktop.
debezium_router.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.clearstreet.debezium; | |
// imports, error-handling, and exception-handling omitted for brevity | |
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> { | |
private static final String PURPOSE = "field extraction"; | |
@Override | |
public R apply(R record) { | |
if (record.valueSchema() == null || record.value() == null) { | |
return record; | |
} | |
final Struct after = requireStructOrNull(record.value(), PURPOSE); | |
if (after == null) { | |
return record; | |
} | |
final Struct value = requireStructOrNull(after.getStruct("after"), PURPOSE); | |
if (value == null) { | |
return record; | |
} | |
int partition = value.getInt32("partition"); | |
// if partition is not 0, then create copy of the record with the new partition | |
if (partition != 0) { | |
return record.newRecord( | |
record.topic(), | |
partition, | |
record.keySchema(), | |
record.key(), | |
record.valueSchema(), | |
record.value(), | |
record.timestamp() | |
); | |
} | |
return record; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment