Skip to content

Instantly share code, notes, and snippets.

@jsnouffer
Last active August 14, 2020 14:36
Show Gist options
  • Save jsnouffer/93a5d6cfd6dcaab114656c6112e485d4 to your computer and use it in GitHub Desktop.
Save jsnouffer/93a5d6cfd6dcaab114656c6112e485d4 to your computer and use it in GitHub Desktop.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
// ... //
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonObject> inputStream = builder.stream(inputTopic);
KStream<String, JsonObject> resultStream = builder.stream(resultTopic);
// left join with default serializers and deserializers
KStream<String, JsonObject> joined = inputStream.leftJoin(resultStream,
(inputValue, outputValue) -> "record=" + inputValue + ", results=" + outputValue, /* ValueJoiner */
JoinWindows.of(TimeUnit.MINUTES.toMillis(5))
);
joined.to(outputTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment