Skip to content

Instantly share code, notes, and snippets.

@sachnk
Created September 27, 2020 14:13
Show Gist options
  • Save sachnk/dc37790412e9ab4d79a8b3a9f958390f to your computer and use it in GitHub Desktop.
Save sachnk/dc37790412e9ab4d79a8b3a9f958390f to your computer and use it in GitHub Desktop.
EventRouter.java
package io.clearstreet.debezium;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger logger = LoggerFactory.getLogger(EventRouter.class);
private static final String PURPOSE = "field extraction";
@Override
public void configure(Map<String, ?> props) {
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public R apply(R record) {
logger.info("EventRouter apply: {}, {}, {}", record.topic(), record.kafkaPartition(), record.getClass().getName());
if (record.valueSchema() == null || record.value() == null) {
return record;
}
final Struct after = requireStructOrNull(record.value(), PURPOSE);
if (after == null) {
return record;
}
final Struct value = requireStructOrNull(after.getStruct("after"), PURPOSE);
if (value == null) {
return record;
}
int partition = 0;
try {
Integer p = value.getInt32("partition");
if (p == null) {
return record;
}
partition = p;
} catch (DataException e) {
return record;
}
if (partition != 0) {
logger.info("EventRouter routed: {}, {} -> {}", record.topic(), record.kafkaPartition(), partition);
return record.newRecord(
record.topic(),
partition,
record.keySchema(),
record.key(),
record.valueSchema(),
record.value(),
record.timestamp()
);
}
return record;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment