Skip to content

Instantly share code, notes, and snippets.

@razorcd
Created November 22, 2020 22:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save razorcd/0d8ad4bf60f83e898f8b4bd84d084dc2 to your computer and use it in GitHub Desktop.
Save razorcd/0d8ad4bf60f83e898f8b4bd84d084dc2 to your computer and use it in GitHub Desktop.
Scaling reactive APIs - www.razorcodes.com
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
public Flux<CustomerUpdatedEvent> getStreamEvents(String eventStreamId, long fromMs) {
//add new eventStream to Set
EventStream eventStream = new EventStream(eventStreamId, fromMs, 0, new HashSet());
streamsList.putIfAbsent(eventStreamId, eventStream);
//create a Flux and add the Sink to the EventStream to be used for publishing
Flux<Object> events = Flux.create(sink -> eventStream.addStream(sink));
//configure the Flux and return it
return events
.cast(CustomerUpdatedEvent.class)
.timeout(Duration.ofMinutes(60))
.doFinally(e -> {
eventStream.removeStream(eventStream);
log.debug("Finally UNRegistering consumer:{}. Cause:{}.", eventStreamId, e);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment