Skip to content

Instantly share code, notes, and snippets.

@razorcd
Created November 22, 2020 22:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save razorcd/643a214e45a2c383038ed9866bbd935c to your computer and use it in GitHub Desktop.
Save razorcd/643a214e45a2c383038ed9866bbd935c to your computer and use it in GitHub Desktop.
Scaling reactive APIs - www.razorcodes.com
@RequiredArgsConstructor
public class RedisPuller {
private final RedisTemplate<String,String> redisTemplate;
private final Map<String,EventStream> streamsList = new ConcurrentHashMap<>();
public void runPuller() {
//create stream array of StreamOffset objects
Set<StreamOffset<String>> streams = buildStreamOffsets(this.streamsList.values().stream());
StreamOffset<String>[] streamArray = new StreamOffset[streams.size()];
streams.toArray(streamArray);
return streamArray;
if (streamArray.length == 0) return;
//pull batch
List<ObjectRecord<String, T>> eventBatch;
try {
eventBatch = pullBatch(streamArray);
} catch (RedisException redisEx) {
closeAllStreams(streams, "Connection error.")
throw redisEx;
}
//publish events to the API streams
eventBatch.forEach(it -> {
final T event = it.getValue();
//get the EventStream object that matches the current event stream from Redis
EventStream eventStreamMatch = this.streamList.get(it.getStream());
eventStreamMatch.publishEvent(event);
//update offsets in main stream collection
this.streamList().put(it.getStream(), eventStreamMatch.with(it.getId().getTimestamp(), it.getId().getSequence() + 1));
});
}
private Set<StreamOffset<String>> buildStreamOffsets(Stream<EventStream> streams) {
return streams.filter(EventStream::hasListeners)
.map(stream -> StreamOffset.create(stream.getStreamName(), ReadOffset.from(""+stream.getOffsetTimeMs()+"-"+stream.getOffsetCount())))
.collect(Collectors.toSet());
}
private List<ObjectRecord<String,String>> pullBatch(StreamOffset<String>[] streamNamesArray) throws RedisException {
return redisTemplate.opsForStream().read(String.class, streamNamesArray);
}
private void closeAllStreams(Set<StreamOffset<String>> streams, String message) {
streams.values().forEach(streamObj -> streamObj.sendError(message));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment