Skip to content

Instantly share code, notes, and snippets.

@fhussonnois
Last active September 24, 2020 21:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fhussonnois/9b93cae7fb9c6394cef182a5f05a2570 to your computer and use it in GitHub Desktop.
Save fhussonnois/9b93cae7fb9c6394cef182a5f05a2570 to your computer and use it in GitHub Desktop.
@Component
public class ServerSentEventsWordCountTopology
// (O) Extend EventStreamSupport or implement the EventStreamProvider interface
extends EventStreamSupport
implements TopologyProvider {
private EventStream<String, Long> wordCountStream;
public ServerSentEventsWordCountTopology() {
// (1) Create a new EventStream for word counts updates.
wordCountStream = new EventStream.Builder("word-count")
.withQueueSize(10_000)
.withQueueLimitHandler(LimitHandlers.dropHeadOnLimitReached())
.build();
// (2) Register the EventStream
addEventStream(wordCountStream);
}
@Override
public String version() {
return Version.getVersion();
}
@Override
public Topology topology() {
var builder = new StreamsBuilder();
builder.<String, String>stream("text-lines")
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
// (3) send record to the EventStream
.foreach( (k, v) -> wordCountStream.send(k, v));
return builder.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment