Skip to content

Instantly share code, notes, and snippets.

@miguno
Last active December 30, 2018 06:28
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save miguno/e3c791f67068efa0cd741d8929a9662a to your computer and use it in GitHub Desktop.
Save miguno/e3c791f67068efa0cd741d8929a9662a to your computer and use it in GitHub Desktop.
WordCount application in Scala 2.11+, using Kafka's Streams API (Kafka version 0.11.0.0)
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import scala.collection.JavaConverters.asJavaIterableConverter
object WordCountApplication {
def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("Counts")
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment