Created
April 4, 2023 04:44
-
-
Save wickstjo/f91efbc4d1e527338eaa3dd89f5c7e9a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.myproject | |
import org.apache.flink.streaming.api.scala._ | |
import scala.io.Source | |
import org.apache.flink.connector.kafka.source.{KafkaSource} | |
import org.apache.flink.connector.kafka.sink.{KafkaSink, KafkaRecordSerializationSchema} | |
// import org.apache.flink.formats.avro.{AvroSerializationSchema} | |
import org.apache.flink.formats.avro.registry.confluent.{ConfluentRegistryAvroDeserializationSchema, ConfluentRegistryAvroSerializationSchema} | |
import org.apache.avro.Schema | |
import org.apache.avro.specific.{SpecificRecordBase, SpecificRecord} | |
// import org.apache.avro.generic.{GenericRecord, GenericData} | |
import org.apache.avro.generic.GenericData | |
case class MyData( | |
var MotorSpeedMe: Float, | |
var MotorTorqMe: Float, | |
var SpeedAs: Float | |
// ) extends SpecificRecordBase with SpecificRecord { | |
) extends SpecificRecord { | |
def this() = this(0.0f, 0.0f, 0.0f) | |
override def getSchema: Schema = new Schema.Parser().parse(""" | |
{ | |
"namespace": "fiberline_1", | |
"type": "record", | |
"name": "cat2", | |
"fields": [ | |
{ | |
"name": "MotorSpeedMe", | |
"type": "float" | |
}, | |
{ | |
"name": "MotorTorqMe", | |
"type": "float" | |
}, | |
{ | |
"name": "SpeedAs", | |
"type": "float" | |
} | |
] | |
} | |
""") | |
override def get(field$: Int): AnyRef = { | |
field$ match { | |
case 0 => MotorSpeedMe.asInstanceOf[AnyRef] | |
case 1 => MotorTorqMe.asInstanceOf[AnyRef] | |
case 2 => SpeedAs.asInstanceOf[AnyRef] | |
case _ => throw new IndexOutOfBoundsException("Invalid field index: field") | |
} | |
} | |
override def put(field$: Int, value: Any): Unit = { | |
field$ match { | |
case 0 => MotorSpeedMe = value.asInstanceOf[Float] | |
case 1 => MotorTorqMe = value.asInstanceOf[Float] | |
case 2 => SpeedAs = value.asInstanceOf[Float] | |
case _ => throw new IndexOutOfBoundsException("Invalid field index: field") | |
} | |
() | |
} | |
} | |
object Main extends App { | |
// CREATE STREAM PROCESSING ENVIRONMENT | |
val env = StreamExecutionEnvironment.getExecutionEnvironment | |
// KAFKA INPUT TOPIC | |
val input_topic: String = "topic1" | |
// CREATE SCHEMA WRAPPERS | |
// val sink_serializer = AvroSerializationSchema.forSpecific(classOf[MyData]) | |
val sink_serializer = ConfluentRegistryAvroSerializationSchema.forSpecific(classOf[MyData], input_topic, "http://localhost:8181") | |
// CREATE INPUT STREAM | |
val input_stream = env.fromCollection(Array( | |
(1.0f, 2.0f, 3.0f), | |
(4.0f, 5.0f, 6.0f), | |
(7.0f, 8.0f, 9.0f), | |
)) | |
// CONVERT TUPLE TO GENERIC RECORD | |
val conversion_stream: DataStream[MyData] = input_stream.map(item => { | |
val temp_record: MyData = MyData(item._1, item._2, item._3) | |
temp_record | |
}) | |
// SERIALIZE & PUSH RECORDS TO KAFKA | |
val output_sink: KafkaSink[MyData] = KafkaSink.builder() | |
.setBootstrapServers("localhost:10001,localhost:10002,localhost:10003") | |
.setRecordSerializer( | |
KafkaRecordSerializationSchema.builder() | |
.setTopic(input_topic) | |
.setValueSerializationSchema(sink_serializer) | |
.build() | |
) | |
.build() | |
// ATTACH CONVERSION STREAM WITH SINK | |
conversion_stream.sinkTo(output_sink) | |
// FINALLY, START THE APPLICATION | |
env.execute("foobar") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment