Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/7ac2120bd219fcf9de7b0b2c8455faa4 to your computer and use it in GitHub Desktop.
Save dacr/7ac2120bd219fcf9de7b0b2c8455faa4 to your computer and use it in GitHub Desktop.
simple kafka stream example / published by https://github.com/dacr/code-examples-manager #348873e7-6a27-4c07-8ccb-8ccfb2548bef/6cbe9a2caf4b530fb051232b74785f344c051d4c
// summary : simple kafka stream example
// keywords : kafka, kafkastream, dsl, streams
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 348873e7-6a27-4c07-8ccb-8ccfb2548bef
// created-on : 2020-04-24T16:56:45Z
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : cs launch --scala 2.13 com.lihaoyi:::ammonite:2.4.0 -M ammonite.Main -- $file
import $ivy.`org.apache.kafka::kafka-streams-scala:2.8.0`
import $ivy.`org.apache.logging.log4j:log4j-api:2.14.1`
import $ivy.`org.apache.logging.log4j:log4j-core:2.14.1`
import $ivy.`org.apache.logging.log4j:log4j-slf4j-impl:2.14.1`
import java.util.Properties
import scala.util.Properties.envOrElse
import java.time.Duration
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.clients.consumer.ConsumerConfig._
org.apache.logging.log4j.core.config.Configurator.setRootLevel(org.apache.logging.log4j.Level.WARN)
/*
topics --create --replication-factor 1 --partitions 1 --topic input-topic
topics --create --replication-factor 1 --partitions 1 --topic output-topic
echo 'Hello' | console-producer --topic input-topic
console-consumer --topic output-topic --group myconsumer
*/
val kafkaConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-simple")
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envOrElse("KAFKA_REMOTE_BROKER", "127.0.0.1:9092"))
properties.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
properties
}
def processRecord(key:String, record:String):String = {
println(s"** received $record with key $key")
"<"+record+">"
}
@main
def main():Unit = {
val builder = new StreamsBuilder
val eventsStream: KStream[String, String] = builder.stream[String, String]("input-topic")
eventsStream
.map { case (key, record) => key -> processRecord(key, record) }
.to("output-topic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), kafkaConfig)
streams.start()
sys.ShutdownHookThread(streams.close(Duration.ofSeconds(10)))
scala.io.StdIn.readLine("Enter to exit...") // required when run as a script as there is an implicit exit on ammonite script ends
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment