Skip to content

Instantly share code, notes, and snippets.

@ova2
Last active July 27, 2021 19:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ova2/7c3f99a74aa1d9c20786f07c88e0bff2 to your computer and use it in GitHub Desktop.
Save ova2/7c3f99a74aa1d9c20786f07c88e0bff2 to your computer and use it in GitHub Desktop.
@Slf4j
public class CacheMono<KEY, IVALUE, OVALUE> {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Map<KEY, CacheMonoValue<OVALUE>> cache = new HashMap<>();
/**
* External value supplier which should be provided if "valuePublisher" with "keyExtractor"
* are not set
*/
private final Function<KEY, Mono<OVALUE>> valueSupplier;
/**
* External source publisher stream which should be provided if "valueSupplier" is not set
*/
private final Flux<IVALUE> valuePublisher;
/**
* Key extractor for emitted items provided by "valuePublisher"
*/
private final Function<IVALUE, KEY> keyExtractor;
/**
* Value extractor for emitted items provided by "valuePublisher"
*/
private final Function<IVALUE, OVALUE> valueExtractor;
private CacheMono(Function<KEY, Mono<OVALUE>> valueSupplier, Flux<IVALUE> valuePublisher,
Function<IVALUE, KEY> keyExtractor, Function<IVALUE, OVALUE> valueExtractor) {
this.valueSupplier = valueSupplier;
this.valuePublisher = valuePublisher;
this.keyExtractor = keyExtractor;
this.valueExtractor = valueExtractor;
}
/**
* Factory method to create a CacheMono instance from an external value supplier. The value
* supplier is called by this CacheMono instance for retrieving values when they are missing
* in cache ("pull" principle to retrieve not yet cached values).
*/
public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromSupplier(
@NonNull Function<KEY, Mono<VALUE>> valueSupplier) {
Objects.requireNonNull(valueSupplier);
return new CacheMono<>(valueSupplier, null, null, null);
}
/**
* Factory method to create a CacheMono instance from an external value publisher.
* Published values will fill this cache (reactive "push" way).
*/
public static <KEY, VALUE> CacheMono<KEY, VALUE, VALUE> fromPublisher(
@NonNull Flux<VALUE> valuePublisher, @NonNull Function<VALUE, KEY> keyExtractor) {
Objects.requireNonNull(valuePublisher);
Objects.requireNonNull(keyExtractor);
return createCacheMono(valuePublisher, keyExtractor, Function.identity());
}
/**
* Factory method to create a CacheMono instance from an external value publisher.
* Published values will fill this cache (reactive "push" way).
*/
public static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> fromPublisher(
@NonNull Flux<IVALUE> valuePublisher,
@NonNull Function<IVALUE, KEY> keyExtractor,
@NonNull Function<IVALUE, OVALUE> valueExtractor) {
Objects.requireNonNull(valuePublisher);
Objects.requireNonNull(keyExtractor);
return createCacheMono(valuePublisher, keyExtractor, valueExtractor);
}
private static <KEY, IVALUE, OVALUE> CacheMono<KEY, IVALUE, OVALUE> createCacheMono(
@NonNull Flux<IVALUE> valuePublisher,
@NonNull Function<IVALUE, KEY> keyExtractor,
@NonNull Function<IVALUE, OVALUE> valueExtractor) {
var cacheMono = new CacheMono<>(null, valuePublisher, keyExtractor, valueExtractor);
valuePublisher.doOnEach(signal -> {
if (signal.hasValue()) {
final var inputValue = signal.get();
final var outputSignal = Signal.next(valueExtractor.apply(inputValue));
cacheMono.cache.put(keyExtractor.apply(inputValue),
new CacheMonoValue<>(outputSignal));
} else if (signal.isOnError()) {
if (signal.getThrowable() == null) {
log.error("Error from value publisher");
} else {
log.error("Error from value publisher, message = {}",
signal.getThrowable().getMessage());
}
}
}).subscribe();
return cacheMono;
}
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment