Skip to content

Instantly share code, notes, and snippets.

@jaceklaskowski
Last active January 19, 2019 15:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaceklaskowski/0266ec31330df5c11262af4a205ee969 to your computer and use it in GitHub Desktop.
Save jaceklaskowski/0266ec31330df5c11262af4a205ee969 to your computer and use it in GitHub Desktop.
Kafka Streams Workshop

Workshop

Exercise: KStream.transformValues

Use KStream.transformValues

Exercise: Using Materialized

val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore]("poznan-state-store")

Exercise: TopologyTestDriver

FIXME

Exercise: Branch to Even and Odd Numbers

Use KStream.branch and KStream.print with different labels (even and odd).

Streaming Word Count

  1. http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
  2. Zadanie: Napisz aplikację, która zlicza słowa w zdaniu (value)
    • KStream.groupBy (zwraca KTable)
    • KTable.toStream
cat sentences.csv | ./bin/kafka-console-producer.sh \
  --broker-list :9092 \
  --topic GroupByApp-input \
  --property parse.key=true \
  --property key.separator=,
./bin/kafka-console-consumer.sh \
    --bootstrap-server :9092 \
    --topic GroupByApp-output \
    --property print.key=true \
    --value-deserializer org.apache.kafka.common.serialization.LongDeserializer
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0)

Pytania Kacper

  1. Processor API
  2. Debugging
  3. Uruchamianie z linii poleceń

Przesunięcie offsetu

./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
./bin/kafka-consumer-groups.sh --reset-offsets --to-latest --execute --group HelloWorldApp --bootstrap-server :9092 --topic exercise1-input
./bin/kafka-console-consumer.sh \
    --bootstrap-server :9092 \
    --topic exercise1-output \
    --property print.key=true \
  --key-deserializer org.apache.kafka.common.serialization.LongDeserializer

KStream.map

./bin/kafka-console-producer.sh --broker-list :9092 --topic exercise1-input --property parse.key=true --property key.separator=,

Uruchamianie z linii poleceń

./bin/kafka-topics.sh --create --topic exercise1-input --zookeeper :2181 --partitions 4 --replication-factor 1
  1. java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar
  • props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
  • 2 wątki po 2 partycje każdy
  1. java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar
  • 2 instancje aplikacji
  • 4 wątki po 1 partycji każdy
./bin/kafka-topics.sh --alter --topic exercise1-input --partitions 8 --zookeeper :2181

Different client.id

  • props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
  • java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar krzysztof
  • java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar kacper

Execution

current active tasks: [0_0, 0_1]
./bin/kafka-topics.sh --describe --topic exercise1-input --zookeeper :2181
./bin/kafka-topics.sh --list --zookeeper :2181
./bin/kafka-topics.sh --delete --topic exercise1-input --zookeeper :2181
./bin/kafka-topics.sh --create --topic exercise1-input --zookeeper :2181 --partitions 2 --replication-factor 1

Development

build.sbt:

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.1.0"

libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.25"

Uwaga na How to define Scala API for Kafka Streams as dependency in build.sbt?

  import org.apache.kafka.streams.scala._
  import Serdes._
  import ImplicitConversions._

object HelloWorldApp extends App {

  println("Hello World")

  // For Scala devs only
  // Scala API for Kafka Streams
  import org.apache.kafka.streams.scala._
  import Serdes._
  import ImplicitConversions._

  // Step 1. Stworzenie Topology
  val builder = new StreamsBuilder
  import org.apache.kafka.streams.kstream.Printed
  val console = Printed
    .toSysOut[String, String]
    .withLabel("DEBUG")
  builder
    .stream[String, String]("exercise1-input")
    .print(console)

  val topology = builder.build
  println(topology.describe)

  // Step 2. Uruchomienie topology
  // Stworzenie środowiska wykonawczego
  val appId = getClass.getName.replace("$", "")
  import org.apache.kafka.streams.KafkaStreams
  val props = new java.util.Properties
  import org.apache.kafka.streams.StreamsConfig
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
  val ks = new KafkaStreams(topology, props)
  // opcjonalne
  ks.cleanUp()
  // obowiązkowe
  ks.start()

}
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

Administration

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
./bin/kafka-console-producer.sh --broker-list :9092 --topic [topic-name]

// Na razie niepotrzebne --property parse.key=true --property key.separator=,
./bin/kafka-console-consumer.sh \
    --bootstrap-server :9092 \
    --topic exercise1-input

// Na razie niepotrzebne --property print.key=true

Requirements

  • Laptop + ładowarka do laptop
  • Java Standard Edition (Java 8 <-> Java 11)
  • Apache Kafka (pobrana)
  • IntelliJ IDEA
    • (opcjonalnie) Scala plugin
  • sbt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment