Skip to content

Instantly share code, notes, and snippets.

@OneCricketeer
Created November 9, 2021 18:31
Show Gist options
  • Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.
Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.
Apache Kafka Topic Mirroring with Apache Spark
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.slf4j.LoggerFactory
object KafkaMirror extends App {
val logger = LoggerFactory.getLogger(getClass)
/**
* For testing output to a console.
*
* @param df A Streaming DataFrame
* @return A DataStreamWriter
*/
private def streamToConsole(df: sql.DataFrame) = {
df.writeStream.outputMode(OutputMode.Append()).format("console")
}
private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("subscribe", topicPattern)
.option("startingOffsets", offsetResetStrategy.toString.toLowerCase())
.load()
}
private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = {
val cols = df.columns
if (!cols.contains("topic")) {
val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""")
logger.error("Unable to stream dataframe to Kafka", cols, e.asInstanceOf[Any])
throw e
}
// output topic comes from dataframe, not from options
df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("checkpointLocation", checkpointLocation) // required option
}
val spark = SparkSession.builder()
.appName("Kafka Test")
.master("local[*]") // TODO: real cluster
.getOrCreate()
import spark.implicits._
val kafkaBootstrap = "localhost:9092" // TODO: real cluster
val df = getKafkaDf(spark, kafkaBootstrap, "input-topic")
streamToKafka(
// Stream topic values into the same partition from input-topic to output-topic
df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")),
kafkaBootstrap,
checkpointLocation = "/tmp/spark-sql-kafka" // TODO: real persistence, and not local disk
).start().awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment