Skip to content

Instantly share code, notes, and snippets.

@mandar2174
Created April 22, 2019 13:50
Show Gist options
  • Save mandar2174/5dc13350b296abf127b92d0697c320f2 to your computer and use it in GitHub Desktop.
Save mandar2174/5dc13350b296abf127b92d0697c320f2 to your computer and use it in GitHub Desktop.
package com.streaming.example
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import java.util.Properties
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
/**
* Class to read the confluent Schema registry avro schema and read/write data from kafka
* Reference link : https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
*/
object ConfluentSchemaRegistryExample {
def main(args: Array[String]): Unit = {
println("Start writing data in kafka using avro serialization and deserialization + confluent schema registry")
//Create the streaming execution environment for processing data
val env = StreamExecutionEnvironment.getExecutionEnvironment
//enable checkpoint for kafka (Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner.)
env.enableCheckpointing(5000)
//set all required properties which is important to connect with kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
// always read the Kafka topic from the start
properties.setProperty("auto.offset.reset", "earliest")
//crete the employee list which can be used to send data in kafka
val userList = env.fromElements(
new User("Yogesh", 26,"Yellow"),
new User("Keshav",76,"Green"),
new User("Mahesh",45,"Blue"))
//perform key-by name operation to generate data-stream and send data to kafka(Note : while writing class(pojo) we need to use TypeInformationSerializationSchema)
val userSource = userList.keyBy("name")
//get the avro deserialize and serialize object for the employee instance which is case class
val userSerialize : AvroSerializationSchema[User] = new AvroSerializationSchema[User](classOf[User])
//val employeeDeserialize : AvroDeserializationSchema[Employee] = new AvroDeserializationSchema[Employee](classOf[Employee])
val schemaRegistryUrl = "http://localhost:8081"
//write the employee data into kafka using avro serialization
userSource.addSink(new FlinkKafkaProducer[User]("test", userSerialize, properties))
//read the employee data from kafka using avro deserialization(
val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[User]("test",
ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[User], schemaRegistryUrl), properties).setStartFromEarliest())
//print the output data read from kafka
userKafkaReaderResult.print()
env.execute("Avro Serialization/Deserialization using Confluent Registry Example")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment