Topic transforming Kafka mirror maker handler
import java.util
import java.util.Collections
import kafka.consumer.BaseConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import scala.util.matching.Regex
* Args are expected to be a string of format:
* regex1=target1;regex2=target2
class TopicChangingMirrorMakerMessageHandler(val args:String)
extends MirrorMakerMessageHandler {
val topicPairs: Array[String] = args.split(';')
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => {
val pairParts = topicPair.split('=')
val srcRegex = new Regex(pairParts(0))
val targetTopic = pairParts(1)
srcRegex -> targetTopic
}) toMap
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined)
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value))
