Topic transforming Kafka mirror maker handler
package kafka.tools | |
import java.util | |
import java.util.Collections | |
import kafka.consumer.BaseConsumerRecord | |
import kafka.tools.MirrorMaker.MirrorMakerMessageHandler | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import scala.util.matching.Regex | |
/** | |
* Args are expected to be a string of format: | |
* regex1=target1;regex2=target2 | |
*/ | |
class TopicChangingMirrorMakerMessageHandler(val args:String) | |
extends MirrorMakerMessageHandler { | |
val topicPairs: Array[String] = args.split(';') | |
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => { | |
val pairParts = topicPair.split('=') | |
val srcRegex = new Regex(pairParts(0)) | |
val targetTopic = pairParts(1) | |
srcRegex -> targetTopic | |
}) toMap | |
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { | |
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined) | |
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic | |
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed | |
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment