Last active
July 16, 2020 12:18
-
-
Save diversit/add7524def84fe20f9cfd73db5728ff2 to your computer and use it in GitHub Desktop.
In Java, sequence a List<CompletableFuture<T>> to a CompletableFuture<List<T>> fully non-blocking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* True non-blocking sequence implementation. | |
* Does _NOT_ use 'join' or 'get' | |
* Creates a new CompletableFuture (aka a promise) which is only completed | |
* once all futures have completed either successfully or with failures. | |
* Failures are ignored, but could also be accumulated if the result type would not be List | |
* but a SequenceResult for example which would contain both the list of result and list of errors | |
* and, this class could even extend List. | |
*/ | |
package eu.allego.ag.authoriser; | |
import org.junit.Assert; | |
import org.junit.Test; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public class JavaCompletableFuturesSequencer { | |
public <T> CompletableFuture<List<T>> sequence(final List<CompletableFuture<T>> futures) { | |
CompletableFuture completableFuture = new CompletableFuture(); | |
final int total = futures.size(); | |
final AtomicInteger counter = new AtomicInteger(); | |
final List<T> results = new ArrayList<>(futures.size()); | |
futures.stream() | |
.forEach(f -> f.whenComplete((ok, error) -> { | |
if (error == null) { | |
// save result | |
synchronized (results) { | |
results.add(ok); | |
} | |
} | |
int totalDone = counter.incrementAndGet(); | |
if (totalDone == total) { | |
// complete when all futures are done | |
// TODO Could also fail result when non of the futures completed successfully. | |
completableFuture.complete(results); | |
} | |
})); | |
return completableFuture; | |
} | |
@Test | |
public void testSequence() throws InterruptedException, ExecutionException, TimeoutException { | |
List<CompletableFuture<Integer>> futures = Arrays.asList( | |
returnEventually(1, 10), | |
returnEventually(2, 0), | |
returnEventually(3, 100), | |
failEventually("fail 1", 200), | |
returnEventually(4, 250) | |
); | |
CompletableFuture<List<Integer>> sequence = sequence(futures); | |
List<Integer> result = sequence.get(200, TimeUnit.SECONDS);// wait until done | |
Assert.assertEquals(Arrays.asList(2,1,3,4), result); | |
} | |
private CompletableFuture<Integer> returnEventually(final int value, final long afterTime) { | |
return CompletableFuture.supplyAsync(() -> { | |
try { | |
Thread.sleep(afterTime); | |
} catch (InterruptedException e) { | |
} | |
return value; | |
}); | |
} | |
private CompletableFuture<Integer> failEventually(String msg, final long afterTime) { | |
return CompletableFuture.supplyAsync(() -> { | |
try { | |
Thread.sleep(afterTime); | |
} catch (InterruptedException e) { | |
} | |
throw new RuntimeException(msg); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment