Skip to content

Instantly share code, notes, and snippets.

@ova2
Last active July 27, 2021 19:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ova2/728c48ef913d1d27f1ef5634709722bd to your computer and use it in GitHub Desktop.
Save ova2/728c48ef913d1d27f1ef5634709722bd to your computer and use it in GitHub Desktop.
/**
* Finds a value by key in an in-memory cache or load it from a remote source.
* The loaded value will be cached.
*/
public Mono<OVALUE> lookup(KEY key) {
return Mono.defer(() -> getValueAsMono(key)
.switchIfEmpty(Mono.defer(() -> onCacheMissResume(key)))
);
}
private Mono<OVALUE> getValueAsMono(KEY key) {
final Lock readLock = lock.readLock();
readLock.lock();
try {
return Mono.justOrEmpty(cache.get(key)).flatMap(CacheMonoValue::toMono);
} finally {
readLock.unlock();
}
}
private Mono<OVALUE> onCacheMissResume(KEY key) {
final Lock writeLock = lock.writeLock();
writeLock.lock();
try {
// check if value was already cached by another thread
final var cachedValue = cache.get(key);
if (cachedValue == null) {
final Mono<OVALUE> monoValue;
if (valuePublisher != null) {
// get value from external value publisher
monoValue = valuePublisher
.filter(value -> Objects.equals(keyExtractor.apply(value), key))
.map(valueExtractor)
.next();
} else if (valueSupplier != null) {
// get value from external supplier
monoValue = valueSupplier.apply(key);
} else {
throw new IllegalStateException("Value can be not determined," +
"neither valuePublisher nor valueSupplier were set");
}
// cache Mono as value immediately
cache.put(key, new CacheMonoValue<>(monoValue));
// cache success and error values encapsulated in signal when it is available
return monoValue.doOnEach(signal -> {
if (signal.isOnNext()) {
cache.put(key, new CacheMonoValue<>(
Signal.next(Objects.requireNonNull(signal.get())))
);
} else if (signal.isOnError()) {
final Signal<OVALUE> errorSignal;
if (signal.getThrowable() == null) {
errorSignal = Signal.error(
new Throwable("Getting value from external provider failed"));
} else {
errorSignal = Signal.error(signal.getThrowable());
}
cache.put(key, new CacheMonoValue<>(errorSignal));
}
});
}
return Mono.justOrEmpty(cachedValue).flatMap(CacheMonoValue::toMono);
} finally {
writeLock.unlock();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment