Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple Kafka streams word count application
import java.lang.Long
import java.util.Properties
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import scala.collection.JavaConverters.asJavaIterableConverter
object WordCountApplication {
def main(args: Array[String]) {
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-scala")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p
}
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long()
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("input-topic-2")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "output-topic-2")
/* The following code works fine:
val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
uppercasedWithMapValues.to("output-topic")*/
val streams: KafkaStreams = new KafkaStreams(builder, streamsConfiguration)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close()
}))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.