Skip to content

Instantly share code, notes, and snippets.

@nitayk
Last active December 26, 2019 10:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nitayk/50da33b7bcce19ad0a7f8244d309cb8f to your computer and use it in GitHub Desktop.
Save nitayk/50da33b7bcce19ad0a7f8244d309cb8f to your computer and use it in GitHub Desktop.
package com.supersonic.bos.consumer
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.Properties
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.TimeWindows
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, ByteArrayWindowStore, Serdes, StreamsBuilder}
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import scala.util.{Random, Try}
object MissingPartitionExample extends App {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "missing-partitions-example")
val bootstrapServers = if (args.length > 0) args(0) else "localhost:9092"
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
p
}
implicit val stringSerde: Serde[String] = Serdes.String
implicit val integerSerde: Serde[Integer] = Serdes.JavaInteger
implicit val intSerde: Serde[Int] = Serdes.Integer
implicit val groupedStrInt: Grouped[String, Int] = Grouped.`with`(Serdes.String, Serdes.Integer)
val builder = new StreamsBuilder()
val input: KStream[String, String] = builder.stream[String, String]("input_numbers_topic")(Consumed.`with`(Serdes.String, Serdes.String))
val branched = input
.peek((k, v) => println(s"input: $k -> $v"))
.flatMapValues(str => Try(str.toInt).toOption)
.branch(
(_, v) => v % 2 == 0,
(_, v) => v % 2 != 0
)
val r = Random.nextInt(2)
val timeWindow = TimeWindows.of(Duration.of(1, ChronoUnit.MINUTES)).grace(Duration.of(0, ChronoUnit.MINUTES))
val i1 = branched(0)
.map((k, v) => s"$r" -> v)
.peek((k, v) => println(s"even: $k -> $v"))
.groupByKey
.windowedBy(timeWindow)
.reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, ByteArrayWindowStore]("even_store"))
.toStream((windowedKey, _) => windowedKey.key())
.peek((k, v) => println(s"even toStream: $k -> $v"))
.groupByKey
.reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, ByteArrayKeyValueStore]("even_store_2"))
val i2 = branched(1)
.map((k, v) => s"$r" -> v)
.peek((k, v) => println(s"odd: $k -> $v"))
.groupByKey
.windowedBy(timeWindow)
.reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, ByteArrayWindowStore]("odd_store"))
.toStream((windowedKey, _) => windowedKey.key())
.peek((k, v) => println(s"even toStream: $k -> $v"))
.groupByKey
.reduce((v1, v2) => v1 + v2)(Materialized.as[String, Int, ByteArrayKeyValueStore]("odd_store_2"))
i1.outerJoin(i2)((v1, v2) => v1 + v2)
.toStream
.peek((k, v) => println(s"join: $k -> $v"))
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.cleanUp()
streams.start()
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
sys.ShutdownHookThread {
val _ = streams.close(Duration.ofSeconds(10))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment