Created
September 27, 2020 14:13
-
-
Save sachnk/dc37790412e9ab4d79a8b3a9f958390f to your computer and use it in GitHub Desktop.
EventRouter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.clearstreet.debezium; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; | |
import java.util.Map; | |
import org.apache.kafka.connect.connector.ConnectRecord; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.errors.DataException; | |
import org.apache.kafka.connect.transforms.Transformation; | |
import org.apache.kafka.common.config.ConfigDef; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; | |
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> { | |
private static final Logger logger = LoggerFactory.getLogger(EventRouter.class); | |
private static final String PURPOSE = "field extraction"; | |
@Override | |
public void configure(Map<String, ?> props) { | |
} | |
@Override | |
public void close() { | |
} | |
@Override | |
public ConfigDef config() { | |
return new ConfigDef(); | |
} | |
@Override | |
public R apply(R record) { | |
logger.info("EventRouter apply: {}, {}, {}", record.topic(), record.kafkaPartition(), record.getClass().getName()); | |
if (record.valueSchema() == null || record.value() == null) { | |
return record; | |
} | |
final Struct after = requireStructOrNull(record.value(), PURPOSE); | |
if (after == null) { | |
return record; | |
} | |
final Struct value = requireStructOrNull(after.getStruct("after"), PURPOSE); | |
if (value == null) { | |
return record; | |
} | |
int partition = 0; | |
try { | |
Integer p = value.getInt32("partition"); | |
if (p == null) { | |
return record; | |
} | |
partition = p; | |
} catch (DataException e) { | |
return record; | |
} | |
if (partition != 0) { | |
logger.info("EventRouter routed: {}, {} -> {}", record.topic(), record.kafkaPartition(), partition); | |
return record.newRecord( | |
record.topic(), | |
partition, | |
record.keySchema(), | |
record.key(), | |
record.valueSchema(), | |
record.value(), | |
record.timestamp() | |
); | |
} | |
return record; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment