Skip to content

Instantly share code, notes, and snippets.

@sderosiaux
Created July 31, 2017 22:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sderosiaux/a338d898d0b9cb982fdaa51ffe40351e to your computer and use it in GitHub Desktop.
Save sderosiaux/a338d898d0b9cb982fdaa51ffe40351e to your computer and use it in GitHub Desktop.
Kafka streams
val builder = new KStreamBuilder
val textLines = builder.stream[String, String]("TextLinesTopic")
val pattern = Pattern.compile("\\W+")
val wordCounts: KTable[String, String] = textLines
.flatMapValues(line => pattern.split(line.toLowerCase).toIterable.asJava)
.groupBy { case (_, word: String) => word }
.count("Counts")
.mapValues(_.toString)
.to(Serdes.String, Serdes.String, "WordsWithCountsTopic")
// Better use https://github.com/aseigneurin/kafka-streams-scala ?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment