Skip to content

Instantly share code, notes, and snippets.

@ivanursul
Created April 10, 2017 08:11
Show Gist options
  • Save ivanursul/dcd4bb382c05843606a96417561b4b31 to your computer and use it in GitHub Desktop.
Save ivanursul/dcd4bb382c05843606a96417561b4b31 to your computer and use it in GitHub Desktop.
package org.kafka.examples;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
public class Main {
public static void main(String[] args) {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
GlobalKTable<String, Long> globalKTable = builder.globalTable(
Serdes.String(), Serdes.Long(), "WordsWithCountsTopic", "WordCounts"
);
builder.stream(stringSerde, stringSerde, "TextLinesTopic")
.flatMapValues(value -> Arrays.asList(
Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS).split(value.toLowerCase())
))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count("Counts")
.toStream()
.to(stringSerde, longSerde, "WordsWithCountsTopic");
builder.stream(stringSerde, stringSerde, "TextLinesTopic1")
.flatMapValues(value -> Arrays.asList(
Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS).split(value.toLowerCase())
))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count("Counts1")
.toStream()
.to(stringSerde, longSerde, "WordsWithCountsTopic1");
KStream<String, Long> wordsWithCountsTopic1 = builder.stream(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic1");
wordsWithCountsTopic1.join(globalKTable, (word, count) -> word, (count1, count2) -> count1 * -1L)
.map((k, v) -> {
System.out.println("Joined key-value: " + k + ": " + v);
return KeyValue.pair(k, v);
})
.to(Serdes.String(), Serdes.Long(), "MergedTopic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment