Created
April 22, 2019 13:50
-
-
Save mandar2174/5dc13350b296abf127b92d0697c320f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.streaming.example | |
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment | |
import org.apache.flink.streaming.api.scala._ | |
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} | |
import java.util.Properties | |
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema | |
/** | |
* Class to read the confluent Schema registry avro schema and read/write data from kafka | |
* Reference link : https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java | |
*/ | |
object ConfluentSchemaRegistryExample { | |
def main(args: Array[String]): Unit = { | |
println("Start writing data in kafka using avro serialization and deserialization + confluent schema registry") | |
//Create the streaming execution environment for processing data | |
val env = StreamExecutionEnvironment.getExecutionEnvironment | |
//enable checkpoint for kafka (Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner.) | |
env.enableCheckpointing(5000) | |
//set all required properties which is important to connect with kafka | |
val properties = new Properties() | |
properties.setProperty("bootstrap.servers", "localhost:9092") | |
// only required for Kafka 0.8 | |
properties.setProperty("zookeeper.connect", "localhost:2181") | |
properties.setProperty("group.id", "test") | |
// always read the Kafka topic from the start | |
properties.setProperty("auto.offset.reset", "earliest") | |
//crete the employee list which can be used to send data in kafka | |
val userList = env.fromElements( | |
new User("Yogesh", 26,"Yellow"), | |
new User("Keshav",76,"Green"), | |
new User("Mahesh",45,"Blue")) | |
//perform key-by name operation to generate data-stream and send data to kafka(Note : while writing class(pojo) we need to use TypeInformationSerializationSchema) | |
val userSource = userList.keyBy("name") | |
//get the avro deserialize and serialize object for the employee instance which is case class | |
val userSerialize : AvroSerializationSchema[User] = new AvroSerializationSchema[User](classOf[User]) | |
//val employeeDeserialize : AvroDeserializationSchema[Employee] = new AvroDeserializationSchema[Employee](classOf[Employee]) | |
val schemaRegistryUrl = "http://localhost:8081" | |
//write the employee data into kafka using avro serialization | |
userSource.addSink(new FlinkKafkaProducer[User]("test", userSerialize, properties)) | |
//read the employee data from kafka using avro deserialization( | |
val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[User]("test", | |
ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[User], schemaRegistryUrl), properties).setStartFromEarliest()) | |
//print the output data read from kafka | |
userKafkaReaderResult.print() | |
env.execute("Avro Serialization/Deserialization using Confluent Registry Example") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment