Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save guysmoilov/d5075395020a1cf594c71d17f657db8b to your computer and use it in GitHub Desktop.
Save guysmoilov/d5075395020a1cf594c71d17f657db8b to your computer and use it in GitHub Desktop.
Topic transforming Kafka mirror maker handler
package kafka.tools
import java.util
import java.util.Collections
import kafka.consumer.BaseConsumerRecord
import kafka.tools.MirrorMaker.MirrorMakerMessageHandler
import org.apache.kafka.clients.producer.ProducerRecord
import scala.util.matching.Regex
/**
* Args are expected to be a string of format:
* regex1=target1;regex2=target2
*/
class TopicChangingMirrorMakerMessageHandler(val args:String)
extends MirrorMakerMessageHandler {
val topicPairs: Array[String] = args.split(';')
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => {
val pairParts = topicPair.split('=')
val srcRegex = new Regex(pairParts(0))
val targetTopic = pairParts(1)
srcRegex -> targetTopic
}) toMap
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined)
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment