Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Topic transforming Kafka mirror maker handler
package kafka.tools
import java.util
import java.util.Collections
import kafka.consumer.BaseConsumerRecord
import kafka.tools.MirrorMaker.MirrorMakerMessageHandler
import org.apache.kafka.clients.producer.ProducerRecord
import scala.util.matching.Regex
/**
* Args are expected to be a string of format:
* regex1=target1;regex2=target2
*/
class TopicChangingMirrorMakerMessageHandler(val args:String)
extends MirrorMakerMessageHandler {
val topicPairs: Array[String] = args.split(';')
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => {
val pairParts = topicPair.split('=')
val srcRegex = new Regex(pairParts(0))
val targetTopic = pairParts(1)
srcRegex -> targetTopic
}) toMap
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined)
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.