Skip to content

Instantly share code, notes, and snippets.

@diversit
Last active July 16, 2020 12:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diversit/add7524def84fe20f9cfd73db5728ff2 to your computer and use it in GitHub Desktop.
Save diversit/add7524def84fe20f9cfd73db5728ff2 to your computer and use it in GitHub Desktop.
In Java, sequence a List<CompletableFuture<T>> to a CompletableFuture<List<T>> fully non-blocking
/**
* True non-blocking sequence implementation.
* Does _NOT_ use 'join' or 'get'
* Creates a new CompletableFuture (aka a promise) which is only completed
* once all futures have completed either successfully or with failures.
* Failures are ignored, but could also be accumulated if the result type would not be List
* but a SequenceResult for example which would contain both the list of result and list of errors
* and, this class could even extend List.
*/
package eu.allego.ag.authoriser;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
public class JavaCompletableFuturesSequencer {
public <T> CompletableFuture<List<T>> sequence(final List<CompletableFuture<T>> futures) {
CompletableFuture completableFuture = new CompletableFuture();
final int total = futures.size();
final AtomicInteger counter = new AtomicInteger();
final List<T> results = new ArrayList<>(futures.size());
futures.stream()
.forEach(f -> f.whenComplete((ok, error) -> {
if (error == null) {
// save result
synchronized (results) {
results.add(ok);
}
}
int totalDone = counter.incrementAndGet();
if (totalDone == total) {
// complete when all futures are done
// TODO Could also fail result when non of the futures completed successfully.
completableFuture.complete(results);
}
}));
return completableFuture;
}
@Test
public void testSequence() throws InterruptedException, ExecutionException, TimeoutException {
List<CompletableFuture<Integer>> futures = Arrays.asList(
returnEventually(1, 10),
returnEventually(2, 0),
returnEventually(3, 100),
failEventually("fail 1", 200),
returnEventually(4, 250)
);
CompletableFuture<List<Integer>> sequence = sequence(futures);
List<Integer> result = sequence.get(200, TimeUnit.SECONDS);// wait until done
Assert.assertEquals(Arrays.asList(2,1,3,4), result);
}
private CompletableFuture<Integer> returnEventually(final int value, final long afterTime) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(afterTime);
} catch (InterruptedException e) {
}
return value;
});
}
private CompletableFuture<Integer> failEventually(String msg, final long afterTime) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(afterTime);
} catch (InterruptedException e) {
}
throw new RuntimeException(msg);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment