Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Topic transforming Kafka mirror maker handler
import java.util
import java.util.Collections
import kafka.consumer.BaseConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import scala.util.matching.Regex
* Args are expected to be a string of format:
* regex1=target1;regex2=target2
class TopicChangingMirrorMakerMessageHandler(val args:String)
extends MirrorMakerMessageHandler {
val topicPairs: Array[String] = args.split(';')
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => {
val pairParts = topicPair.split('=')
val srcRegex = new Regex(pairParts(0))
val targetTopic = pairParts(1)
srcRegex -> targetTopic
}) toMap
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined)
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment