Skip to content

Instantly share code, notes, and snippets.

@fhussonnois
Created November 26, 2019 20:26
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save fhussonnois/43cc27dd5e3b5143f574040d6f2d58c8 to your computer and use it in GitHub Desktop.
Save fhussonnois/43cc27dd5e3b5143f574040d6f2d58c8 to your computer and use it in GitHub Desktop.
Kafka Streams WordCount Example
package example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
public class WordCount {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder
.stream("streams-plaintext-input");
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "streams-word-count");
props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(
new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment