Skip to content

Instantly share code, notes, and snippets.

@wickstjo
Created April 4, 2023 04:44
Show Gist options
  • Save wickstjo/f91efbc4d1e527338eaa3dd89f5c7e9a to your computer and use it in GitHub Desktop.
Save wickstjo/f91efbc4d1e527338eaa3dd89f5c7e9a to your computer and use it in GitHub Desktop.
package com.example.myproject
import org.apache.flink.streaming.api.scala._
import scala.io.Source
import org.apache.flink.connector.kafka.source.{KafkaSource}
import org.apache.flink.connector.kafka.sink.{KafkaSink, KafkaRecordSerializationSchema}
// import org.apache.flink.formats.avro.{AvroSerializationSchema}
import org.apache.flink.formats.avro.registry.confluent.{ConfluentRegistryAvroDeserializationSchema, ConfluentRegistryAvroSerializationSchema}
import org.apache.avro.Schema
import org.apache.avro.specific.{SpecificRecordBase, SpecificRecord}
// import org.apache.avro.generic.{GenericRecord, GenericData}
import org.apache.avro.generic.GenericData
case class MyData(
var MotorSpeedMe: Float,
var MotorTorqMe: Float,
var SpeedAs: Float
// ) extends SpecificRecordBase with SpecificRecord {
) extends SpecificRecord {
def this() = this(0.0f, 0.0f, 0.0f)
override def getSchema: Schema = new Schema.Parser().parse("""
{
"namespace": "fiberline_1",
"type": "record",
"name": "cat2",
"fields": [
{
"name": "MotorSpeedMe",
"type": "float"
},
{
"name": "MotorTorqMe",
"type": "float"
},
{
"name": "SpeedAs",
"type": "float"
}
]
}
""")
override def get(field$: Int): AnyRef = {
field$ match {
case 0 => MotorSpeedMe.asInstanceOf[AnyRef]
case 1 => MotorTorqMe.asInstanceOf[AnyRef]
case 2 => SpeedAs.asInstanceOf[AnyRef]
case _ => throw new IndexOutOfBoundsException("Invalid field index: field")
}
}
override def put(field$: Int, value: Any): Unit = {
field$ match {
case 0 => MotorSpeedMe = value.asInstanceOf[Float]
case 1 => MotorTorqMe = value.asInstanceOf[Float]
case 2 => SpeedAs = value.asInstanceOf[Float]
case _ => throw new IndexOutOfBoundsException("Invalid field index: field")
}
()
}
}
object Main extends App {
// CREATE STREAM PROCESSING ENVIRONMENT
val env = StreamExecutionEnvironment.getExecutionEnvironment
// KAFKA INPUT TOPIC
val input_topic: String = "topic1"
// CREATE SCHEMA WRAPPERS
// val sink_serializer = AvroSerializationSchema.forSpecific(classOf[MyData])
val sink_serializer = ConfluentRegistryAvroSerializationSchema.forSpecific(classOf[MyData], input_topic, "http://localhost:8181")
// CREATE INPUT STREAM
val input_stream = env.fromCollection(Array(
(1.0f, 2.0f, 3.0f),
(4.0f, 5.0f, 6.0f),
(7.0f, 8.0f, 9.0f),
))
// CONVERT TUPLE TO GENERIC RECORD
val conversion_stream: DataStream[MyData] = input_stream.map(item => {
val temp_record: MyData = MyData(item._1, item._2, item._3)
temp_record
})
// SERIALIZE & PUSH RECORDS TO KAFKA
val output_sink: KafkaSink[MyData] = KafkaSink.builder()
.setBootstrapServers("localhost:10001,localhost:10002,localhost:10003")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(input_topic)
.setValueSerializationSchema(sink_serializer)
.build()
)
.build()
// ATTACH CONVERSION STREAM WITH SINK
conversion_stream.sinkTo(output_sink)
// FINALLY, START THE APPLICATION
env.execute("foobar")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment