Skip to content

Instantly share code, notes, and snippets.

@adilakhter
Last active September 21, 2018 10:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adilakhter/3dd9914384fef20b0da8ed94defb662c to your computer and use it in GitHub Desktop.
Save adilakhter/3dd9914384fef20b0da8ed94defb662c to your computer and use it in GitHub Desktop.
Flink Kafka read and write example
package org.xiaon
import grizzled.slf4j.Logging
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
object KafkaReadAndWriteExample extends App with Logging {
val MaxParallelism = 1
val paramterTool = ParameterTool.fromArgs (args)
implicit val typeInfoString: TypeInformation[String] = TypeInformation.of(classOf[String])
implicit val typeInfoFeedbackWithClassification: TypeInformation[FeedbackWithClassification] = TypeInformation.of(classOf[FeedbackWithClassification])
val environment: StreamExecutionEnvironment = setupLocalFlinkEnvironment(MaxParallelism)
val feedbackConsumer = new FlinkKafkaConsumer011(paramterTool.getRequired("topic"), new SimpleFeedbackSchema(), paramterTool.getProperties)
val feedbackWithClassificationProducer: SinkFunction[String] = new FlinkKafkaProducer011[String](paramterTool.getRequired("outputTopic"), new SimpleFeedbackSchema(), paramterTool.getProperties)
environment
.addSource(feedbackConsumer)
.rebalance
.map(s => classifyFeedback(s).toString)
.addSink(feedbackWithClassificationProducer)
environment.execute("Test Job")
def setupLocalFlinkEnvironment(parallelism: Int): StreamExecutionEnvironment = {
val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
env.getConfig.disableSysoutLogging
env
}
def classifyFeedback(feedback: String): FeedbackWithClassification = {
// TODO: GRPC Client executing the model
FeedbackWithClassification(feedback, "Unkonwn")
}
case class FeedbackWithClassification(feedback: String, classification: String)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment