Skip to content

Instantly share code, notes, and snippets.

@adriaanm
Created April 13, 2017 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adriaanm/b0104d88ba86e918d4464ce30b829606 to your computer and use it in GitHub Desktop.
Save adriaanm/b0104d88ba86e918d4464ce30b829606 to your computer and use it in GitHub Desktop.
name := "kafka-wordcount"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.10.2.0"
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KTable
import java.util.{Locale, Properties}
/**
* Demonstrates, using the high-level KStream DSL, how to implement the WordCount program
* that computes a simple word occurrence histogram from an input text.
*
* In this example, the input stream reads from a topic named "streams-file-input", where the values of messages
* represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record
* is an updated count of a single word.
*
* Before running this example you must create the input topic and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
object WordCountDemoScala {
def main(args: Array[String]): Unit = {
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
import scala.collection.JavaConverters._
val builder = new KStreamBuilder
val source: KStream[String, String] = builder.stream("streams-file-input")
val counts: KTable[String, java.lang.Long] = source.
flatMapValues[String] { _.toLowerCase(Locale.getDefault).split(" ").toList.asJava }.
map[String, String] { (key, value) => new KeyValue(value, value) }.
groupByKey.count("Counts")
// need to override value serde to Long type
counts.to(Serdes.String, Serdes.Long, "streams-wordcount-output")
val streams = new KafkaStreams(builder, props)
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L)
streams.close()
}
}
@adriaanm
Copy link
Author

@Rtowne-Janrain
Copy link

Cool, thanks, Adriaan... this helped me see how I wasn't providing the map types that I assumed could be inferred..
Cheers,
Robert

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment