Skip to content

Instantly share code, notes, and snippets.

@bijukunjummen
Last active November 27, 2023 05:27
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4 to your computer and use it in GitHub Desktop.
Save bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4 to your computer and use it in GitHub Desktop.
Demonstration of caching with Project Reactor and Caffeine
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
public class CachingUtils {
private static final String FIXED_KEY = "FIXED_KEY";
private CachingUtils() {
}
public static <T> Function<String, T> of(@NotNull Duration duration, @NotNull Function<String, T> fn) {
final LoadingCache<String, T> cache = Caffeine.newBuilder()
.expireAfterWrite(duration)
.build((String k) -> fn.apply(k));
return (String key) -> cache.get(key);
}
public static <T> Supplier<T> of(@NotNull Duration duration, @NotNull Supplier<T> supplier) {
Function<String, T> fn = of(duration, k -> supplier.get());
return () -> fn.apply(FIXED_KEY);
}
public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration, @NotNull Function<String, Mono<T>> fn) {
final AsyncLoadingCache<String, T> cache = Caffeine.newBuilder()
.expireAfterWrite(duration.multipliedBy(2))
.refreshAfterWrite(duration)
.buildAsync((k, e) ->
fn.apply(k)
.subscribeOn(Schedulers.fromExecutor(e))
.toFuture());
return (k) -> Mono.fromFuture(cache.get(k));
}
public static <T> Mono<T> ofMonoFixedKey(@NotNull Duration duration, @NotNull Mono<T> mono) {
Function<String, Mono<T>> monoFn = ofMono(duration, key -> mono);
return Mono.defer(() -> monoFn.apply(FIXED_KEY));
}
}
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
class CachingUtilsTest {
@Test
void testSyncCaching() {
Random random = new Random();
Function<String, Integer> fn = (k) -> random.nextInt();
Function<String, Integer> wrappedFn = CachingUtils.of(Duration.ofSeconds(10), fn);
int result1 = wrappedFn.apply("key1");
assertThat(wrappedFn.apply("key1")).isEqualTo(result1);
assertThat(wrappedFn.apply("key1")).isEqualTo(result1);
assertThat(wrappedFn.apply("key2")).isNotEqualTo(result1);
}
@Test
void testSupplierCaching() {
Random random = new Random();
Supplier<Integer> fn = () -> random.nextInt();
Supplier<Integer> wrappedSupplier = CachingUtils.of(Duration.ofSeconds(60), fn);
int result1 = wrappedSupplier.get();
assertThat(wrappedSupplier.get()).isEqualTo(result1);
assertThat(wrappedSupplier.get()).isEqualTo(result1);
}
private Mono<String> get(String key) {
Random random = ThreadLocalRandom.current();
return Mono.fromSupplier(() -> key + random.nextInt());
}
@Test
void testMonoCaching() {
Function<String, Mono<String>> fn = (k) -> get(k);
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn);
StepVerifier.create(wrappedFn.apply("key1"))
.assertNext(result1 -> {
StepVerifier.create(wrappedFn.apply("key1"))
.assertNext(result2 -> {
assertThat(result2).isEqualTo(result1);
})
.verifyComplete();
StepVerifier.create(wrappedFn.apply("key1"))
.assertNext(result2 -> {
assertThat(result2).isEqualTo(result1);
})
.verifyComplete();
StepVerifier.create(wrappedFn.apply("key2"))
.assertNext(result2 -> {
assertThat(result2).isNotEqualTo(result1);
})
.verifyComplete();
})
.verifyComplete();
}
@Test
void testMonoCachingFixedKey() {
Random random = new Random();
Mono<Integer> mono = Mono.fromCallable(() -> random.nextInt());
Mono<Integer> wrapped = CachingUtils.ofMonoFixedKey(Duration.ofSeconds(10), mono);
StepVerifier.create(wrapped)
.assertNext(result1 -> {
StepVerifier.create(wrapped)
.assertNext(result2 -> {
assertThat(result2).isEqualTo(result1);
})
.verifyComplete();
})
.verifyComplete();
}
}
@checketts
Copy link

checketts commented Mar 24, 2021

Why is the cache created on each call https://gist.github.com/bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4#file-cachingutils-java-L32 , or is that just a creation and the resulting mono should always be reused?

Shouldn't it be created once so values can be cached?

@bijukunjummen
Copy link
Author

Why is the cache created on each call https://gist.github.com/bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4#file-cachingutils-java-L32 , or is that just a creation and the resulting mono should always be reused?

Shouldn't it be created once so values can be cached?

Hi @checketts, the expectation from a caller is to call to function just once, the cache is created internally and this cache is then used for caching subsequent calls. So it would take in a function and return a cache wrapped function. From that point on the wrapped function should be used.

@ben-manes
Copy link

You can use AsyncCache so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,

public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
    AsyncCache<String, T> cache = Caffeine.newBuilder()
        .expireAfterWrite(duration)
        .buildAsync();
    return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture()));
}

@checketts
Copy link

Thanks for the reminder @ben-manes . I've even used that feature before! :D

@bijukunjummen
Copy link
Author

You can use AsyncCache so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,

public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
    AsyncCache<String, T> cache = Caffeine.newBuilder()
        .expireAfterWrite(duration)
        .buildAsync();
    return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture()));
}

Awesome!! thanks Ben-manes! Yes, makes sense to use the AsyncCache version, I was indeed running into cache stampede with the sync cache. I have modified the gist now.
Love Caffeine by the way and thank you for creating/maintaining it.

@shareisall
Copy link

You can use AsyncCache so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,

public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
    AsyncCache<String, T> cache = Caffeine.newBuilder()
        .expireAfterWrite(duration)
        .buildAsync();
    return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture()));
}
public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
    AsyncCache<String, T> cache = Caffeine.newBuilder()
        .expireAfterWrite(duration)
        .buildAsync();
    return key -> Mono.fromFuture(cache.get(key, (k, executor) -> fn.apply(k).toFuture()));
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment