Skip to content

Instantly share code, notes, and snippets.

@domnikl
Last active January 17, 2022 10:10
Show Gist options
  • Save domnikl/8060391e7461f89dfd20ed203e814036 to your computer and use it in GitHub Desktop.
Save domnikl/8060391e7461f89dfd20ed203e814036 to your computer and use it in GitHub Desktop.
Blogpost: Integrating Confluent Schema Registry with Apache Spark applications
fun main() {
val config = mapOf(
"bootstrap.servers" to "localhost:9092",
"schema.registry.url" to "http://localhost:8081",
"key.serializer" to "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" to "io.confluent.kafka.serializers.KafkaAvroSerializer",
)
val consumer = KafkaConsumer<String, Cats>(config)
consumer.subscribe(listOf("cats"))
while (true) {
val records = consumer.poll(Duration.ofSeconds(60))
records.forEach {
// process cat records
}
}
}
fun <T> Dataset<T>.readAvro(topic: String, schemaRegistryUrl: String): Dataset<Row> {
val fromAvroConfig = AbrisConfig
.fromConfluentAvro()
.downloadReaderSchemaByLatestVersion()
.andTopicNameStrategy(topic, false)
.usingSchemaRegistry(schemaRegistryUrl)
return this.select(functions.from_avro(col("value"), fromAvroConfig).`as`("data"))
.select("data.*")
}
val spark = SparkSession
.builder()
.appName("CatRecordConsumer")
.orCreate
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "cats")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.readAvro("cats", "localhost:8081")
.writeStream()
.format("parquet")
.option("mergeSchema", "true")
.option("checkpointLocation", "./_checkpoints/cats")
.partitionBy("date")
.start("./cats")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment