Skip to content

Instantly share code, notes, and snippets.

@razorcd
Created November 22, 2020 22:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save razorcd/32b3da7c02f9f2df93da088e13240a47 to your computer and use it in GitHub Desktop.
Save razorcd/32b3da7c02f9f2df93da088e13240a47 to your computer and use it in GitHub Desktop.
Scaling reactive APIs - www.razorcodes.com
@Value
public class EventStream {
private final String eventStreamId;
private final Long offsetTimeMs;
private final Long offsetCount;
private final Set<FluxSink<Object>> fluxSinks;
public void addStream(FluxSink<Object> newFluxSinkObject) {
synchronized(this) {
fluxSinks.add(newFluxSinkObject);
}
}
public void removeStream(FluxSink<Object> existingFluxSinkObject) {
synchronized(this) {
fluxSinks.remove(existingFluxSinkObject);
}
}
public boolean hasListeners() {
return !fluxSinks.isEmpty();
}
//closes all stream connections with an error message.
public void sendError(String message) {
fluxSinks.forEach(fluxSink -> fluxSink.error(new ConnectionClosedException(message)));
}
//publish an event to all streams
public void publishEvent(String event) {
fluxSinks.forEach(fluxSink -> fluxSink.next(event));
}
//creates a new object with updated offsets
public EventStream withOffset(Long offsetMs, Long offsetCount) {
return new EventStream(this.eventStreamId, offsetMs, offsetCount, this.fluxSinks);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment