Skip to content

Instantly share code, notes, and snippets.

@eugene-sadovsky
Created November 14, 2023 11:02
Show Gist options
  • Save eugene-sadovsky/46e75836c017652cb03fb3f27cf06e9e to your computer and use it in GitHub Desktop.
Save eugene-sadovsky/46e75836c017652cb03fb3f27cf06e9e to your computer and use it in GitHub Desktop.
Exponential retry
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.stream.IntStream;
@Slf4j
public class BackoffRetry {
private static final Duration CHECK_INTERVAL = Duration.ofMillis(500);
public static void main(String[] args) {
Disposable subscription = Flux.interval(CHECK_INTERVAL)
.doOnSubscribe((s) -> log.info("Scheduled check every {}", CHECK_INTERVAL))
.log(log.getName(), Level.FINEST)
.concatMap((i) -> checkAllInstances(i))
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.doBeforeRetry((s) -> log.warn("Unexpected error in status check. Retries {} / {}",
s.totalRetries(),
s.totalRetriesInARow(),
s.failure())))
.subscribe();
try {
TimeUnit.HOURS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static Mono<Void> checkAllInstances(Long tick) {
log.info("running check {}", tick);
List<Integer> items = IntStream.range(0, 5).boxed().toList();
return Flux.fromIterable(items).flatMap(idx -> {
log.info("flatMap {}", idx);
return randomlySlowMono(0.1);
}).then();
}
private static Mono<Void> randomlySlowMono(double slowProbability) {
if (ThreadLocalRandom.current().nextDouble() <= slowProbability) {
return Mono.just("slow response")
.delayElement(CHECK_INTERVAL.plus(Duration.ofMillis(100)))
.then();
} else {
return Mono.empty();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment