Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// imports and license left out for clarity
public class OptimizedStreams {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 ");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> inputStream = builder.stream("inputTopic");
final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0,5));
// first repartition
changedKeyStream.groupByKey(Grouped.as("count-repartition"))
.count(Materialized.as("count-store"))
.toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
// second repartition
changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("windowed-count-store"))
.toStream().to("windowed-count",
Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
final Topology topology = builder.build(properties);
final KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
kafkaStreams.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.