Created
November 9, 2021 18:31
-
-
Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.
Apache Kafka Topic Mirroring with Apache Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.OffsetResetStrategy | |
import org.apache.spark.sql | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.streaming.OutputMode | |
import org.slf4j.LoggerFactory | |
object KafkaMirror extends App { | |
val logger = LoggerFactory.getLogger(getClass) | |
/** | |
* For testing output to a console. | |
* | |
* @param df A Streaming DataFrame | |
* @return A DataStreamWriter | |
*/ | |
private def streamToConsole(df: sql.DataFrame) = { | |
df.writeStream.outputMode(OutputMode.Append()).format("console") | |
} | |
private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = { | |
spark.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrap) | |
.option("subscribe", topicPattern) | |
.option("startingOffsets", offsetResetStrategy.toString.toLowerCase()) | |
.load() | |
} | |
private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = { | |
val cols = df.columns | |
if (!cols.contains("topic")) { | |
val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""") | |
logger.error("Unable to stream dataframe to Kafka", cols, e.asInstanceOf[Any]) | |
throw e | |
} | |
// output topic comes from dataframe, not from options | |
df.writeStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", bootstrap) | |
.option("checkpointLocation", checkpointLocation) // required option | |
} | |
val spark = SparkSession.builder() | |
.appName("Kafka Test") | |
.master("local[*]") // TODO: real cluster | |
.getOrCreate() | |
import spark.implicits._ | |
val kafkaBootstrap = "localhost:9092" // TODO: real cluster | |
val df = getKafkaDf(spark, kafkaBootstrap, "input-topic") | |
streamToKafka( | |
// Stream topic values into the same partition from input-topic to output-topic | |
df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")), | |
kafkaBootstrap, | |
checkpointLocation = "/tmp/spark-sql-kafka" // TODO: real persistence, and not local disk | |
).start().awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment