import com.github.benmanes.caffeine.cache.AsyncLoadingCache; | |
import com.github.benmanes.caffeine.cache.Caffeine; | |
import com.github.benmanes.caffeine.cache.LoadingCache; | |
import org.jetbrains.annotations.NotNull; | |
import reactor.core.publisher.Mono; | |
import reactor.core.scheduler.Schedulers; | |
import java.time.Duration; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
public class CachingUtils { | |
private static final String FIXED_KEY = "FIXED_KEY"; | |
private CachingUtils() { | |
} | |
public static <T> Function<String, T> of(@NotNull Duration duration, @NotNull Function<String, T> fn) { | |
final LoadingCache<String, T> cache = Caffeine.newBuilder() | |
.expireAfterWrite(duration) | |
.build((String k) -> fn.apply(k)); | |
return (String key) -> cache.get(key); | |
} | |
public static <T> Supplier<T> of(@NotNull Duration duration, @NotNull Supplier<T> supplier) { | |
Function<String, T> fn = of(duration, k -> supplier.get()); | |
return () -> fn.apply(FIXED_KEY); | |
} | |
public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration, @NotNull Function<String, Mono<T>> fn) { | |
final AsyncLoadingCache<String, T> cache = Caffeine.newBuilder() | |
.expireAfterWrite(duration.multipliedBy(2)) | |
.refreshAfterWrite(duration) | |
.buildAsync((k, e) -> | |
fn.apply(k) | |
.subscribeOn(Schedulers.fromExecutor(e)) | |
.toFuture()); | |
return (k) -> Mono.fromFuture(cache.get(k)); | |
} | |
public static <T> Mono<T> ofMonoFixedKey(@NotNull Duration duration, @NotNull Mono<T> mono) { | |
Function<String, Mono<T>> monoFn = ofMono(duration, key -> mono); | |
return Mono.defer(() -> monoFn.apply(FIXED_KEY)); | |
} | |
} |
import org.junit.jupiter.api.Test; | |
import reactor.core.publisher.Mono; | |
import reactor.test.StepVerifier; | |
import java.time.Duration; | |
import java.util.Random; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import static org.assertj.core.api.Assertions.assertThat; | |
class CachingUtilsTest { | |
@Test | |
void testSyncCaching() { | |
Random random = new Random(); | |
Function<String, Integer> fn = (k) -> random.nextInt(); | |
Function<String, Integer> wrappedFn = CachingUtils.of(Duration.ofSeconds(10), fn); | |
int result1 = wrappedFn.apply("key1"); | |
assertThat(wrappedFn.apply("key1")).isEqualTo(result1); | |
assertThat(wrappedFn.apply("key1")).isEqualTo(result1); | |
assertThat(wrappedFn.apply("key2")).isNotEqualTo(result1); | |
} | |
@Test | |
void testSupplierCaching() { | |
Random random = new Random(); | |
Supplier<Integer> fn = () -> random.nextInt(); | |
Supplier<Integer> wrappedSupplier = CachingUtils.of(Duration.ofSeconds(60), fn); | |
int result1 = wrappedSupplier.get(); | |
assertThat(wrappedSupplier.get()).isEqualTo(result1); | |
assertThat(wrappedSupplier.get()).isEqualTo(result1); | |
} | |
private Mono<String> get(String key) { | |
Random random = ThreadLocalRandom.current(); | |
return Mono.fromSupplier(() -> key + random.nextInt()); | |
} | |
@Test | |
void testMonoCaching() { | |
Function<String, Mono<String>> fn = (k) -> get(k); | |
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn); | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result1 -> { | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
StepVerifier.create(wrappedFn.apply("key2")) | |
.assertNext(result2 -> { | |
assertThat(result2).isNotEqualTo(result1); | |
}) | |
.verifyComplete(); | |
}) | |
.verifyComplete(); | |
} | |
@Test | |
void testMonoCachingFixedKey() { | |
Random random = new Random(); | |
Mono<Integer> mono = Mono.fromCallable(() -> random.nextInt()); | |
Mono<Integer> wrapped = CachingUtils.ofMonoFixedKey(Duration.ofSeconds(10), mono); | |
StepVerifier.create(wrapped) | |
.assertNext(result1 -> { | |
StepVerifier.create(wrapped) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
}) | |
.verifyComplete(); | |
} | |
} |
Why is the cache created on each call https://gist.github.com/bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4#file-cachingutils-java-L32 , or is that just a creation and the resulting mono should always be reused?
Shouldn't it be created once so values can be cached?
Hi @checketts, the expectation from a caller is to call to function just once, the cache is created internally and this cache is then used for caching subsequent calls. So it would take in a function and return a cache wrapped function. From that point on the wrapped function should be used.
You can use AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,
public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
AsyncCache<String, T> cache = Caffeine.newBuilder()
.expireAfterWrite(duration)
.buildAsync();
return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture()));
}
Thanks for the reminder @ben-manes . I've even used that feature before! :D
You can use
AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) { AsyncCache<String, T> cache = Caffeine.newBuilder() .expireAfterWrite(duration) .buildAsync(); return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture())); }
Awesome!! thanks Ben-manes! Yes, makes sense to use the AsyncCache version, I was indeed running into cache stampede with the sync cache. I have modified the gist now.
Love Caffeine by the way and thank you for creating/maintaining it.
You can use
AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) { AsyncCache<String, T> cache = Caffeine.newBuilder() .expireAfterWrite(duration) .buildAsync(); return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture())); }
public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
AsyncCache<String, T> cache = Caffeine.newBuilder()
.expireAfterWrite(duration)
.buildAsync();
return key -> Mono.fromFuture(cache.get(key, (k, executor) -> fn.apply(k).toFuture()));
}
Why is the cache created on each call https://gist.github.com/bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4#file-cachingutils-java-L32 , or is that just a creation and the resulting mono should always be reused?
Shouldn't it be created once so values can be cached?