Skip to content

Instantly share code, notes, and snippets.

@samuraisam
Created June 13, 2020 19:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samuraisam/b89b0aa841ee7a663357505b0f5f5782 to your computer and use it in GitHub Desktop.
Save samuraisam/b89b0aa841ee7a663357505b0f5f5782 to your computer and use it in GitHub Desktop.
package org.brd.blockchaindb.chaindata.cassandra.cql;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j
public class CompletionStageExtras {
AtomicInteger submitted = new AtomicInteger(0);
AtomicInteger operations = new AtomicInteger(0);
AtomicInteger successes = new AtomicInteger(0);
AtomicInteger failures = new AtomicInteger(0);
AtomicInteger retries = new AtomicInteger(0);
ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
boolean tracingEnabled = false;
public <T> CompletionStage<T> retry(Class<? extends Throwable> retryable, Supplier<CompletionStage<T>> supplier) {
if (tracingEnabled) submitted.incrementAndGet();
return retryInternal(0, retryable, supplier);
}
private <T> CompletionStage<T> retryInternal(int retryCount,
Class<? extends Throwable> retryable,
Supplier<CompletionStage<T>> supplier) {
if (tracingEnabled) operations.incrementAndGet();
return supplier.get().handle((ret, throwable) -> {
if (tracingEnabled) operations.decrementAndGet();
if (tracingEnabled && throwable != null) {
failures.incrementAndGet();
} else {
successes.incrementAndGet();
}
boolean shouldRetry = false;
if (retryable.isInstance(throwable) ||
(throwable instanceof CompletionException && retryable.isInstance(throwable.getCause()))) {
shouldRetry = true;
} else if (throwable != null) {
throw new CompletionException(throwable);
}
if (shouldRetry) {
if (tracingEnabled) retries.incrementAndGet();
long delay = Math.round(Math.floor(((double)Math.max(retryCount, 1)) * 1.5));
return scheduleAsync(() -> retryInternal(retryCount + 1, retryable, supplier), delay);
}
return CompletableFuture.completedFuture(ret);
}).thenCompose(Function.identity());
}
private <T> CompletionStage<T> scheduleAsync(Supplier<CompletionStage<T>> supplier,
long delayInMs) {
CompletableFuture<T> completable = new CompletableFuture<>();
retryExecutor.schedule(() -> {
supplier.get()
.thenAccept(completable::complete)
.exceptionally(e -> {
completable.completeExceptionally(e);
return null;
});
}, delayInMs, TimeUnit.MILLISECONDS);
return completable;
}
public void logStats() {
log.info("active={} submitted={} succeeded={} failed={} retried={}",
operations.get(), submitted.get(), successes.get(), failures.get(), retries.get());
}
}
@samuraisam
Copy link
Author

Used like this:

    public CompletionStage<AsyncResultSet> execute(CompletionStage<PreparedStatement> preparedStatement,
                                                   StatementBinder binder) {
        return preparedStatement.thenCompose(ps -> extras.retry(
                NoNodeAvailableException.class, () -> session.executeAsync(binder.bind(ps))));
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment