Skip to content

Instantly share code, notes, and snippets.

@acgray
Created October 30, 2018 16:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save acgray/fb7b8eac8979192a2876c985ab4614fa to your computer and use it in GitHub Desktop.
Save acgray/fb7b8eac8979192a2876c985ab4614fa to your computer and use it in GitHub Desktop.
package com.example.bank.impl;
import akka.Done;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SagaManager {
private static final Logger LOG = LoggerFactory.getLogger(SagaManager.class);
static class SagaStep<T> {
String description;
Function<T, CompletionStage<T>> forwards;
Function<T, CompletionStage<T>> backwards;
SagaStep(String description, Function<T, CompletionStage<T>> forwards,
Function<T, CompletionStage<T>> backwards) {
this.description = description;
this.forwards = (v) -> {
LOG.info(description + " - FORWARDS");
return forwards.apply(v);
};
this.backwards = (v) -> {
LOG.info(description + " - BACKWARDS");
return backwards.apply(v);
};
}
}
private final List<SagaStep<Done>> steps;
private final List<SagaStep<Done>> completedSteps = new ArrayList<>();
public SagaManager(List<SagaStep<Done>> steps) {
this.steps = steps;
}
public CompletionStage<Done> begin() {
CompletableFuture<Done> result = CompletableFuture.completedFuture(Done.getInstance());
for (SagaStep<Done> action : steps) {
LOG.info("Composing step " + action.description);
result = result.thenCompose(ifNotNull(done -> {
CompletionStage<Done> next = CompletableFuture.completedFuture(done)
.thenCompose(action.forwards);
next.exceptionally(rollback(action));
next.thenAccept(ifNotNull(() ->
completedSteps.add(0, action)));
return next;
}));
}
return result;
}
private <T> Function<Throwable, Done> rollback(SagaStep<T> failed) {
return cause -> {
LOG.warn("Action {} failed, will roll back", failed.description);
CompletableFuture<Done> rollbackStep = CompletableFuture
.completedFuture(Done.getInstance());
for (SagaStep<Done> step : completedSteps) {
rollbackStep = rollbackStep.thenCompose(step.backwards);
}
return Done.getInstance();
};
}
private static Function<Done, CompletionStage<Done>> ifNotNull(
Function<Done, CompletionStage<Done>> action) {
return maybeDone -> maybeDone == null ? null : action.apply(maybeDone);
}
private static Consumer<Done> ifNotNull(Runnable action) {
return maybeDone -> {
if (maybeDone != null) {
action.run();
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment