Skip to content

Instantly share code, notes, and snippets.

@purijatin
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save purijatin/d5e05d84941c3c408282 to your computer and use it in GitHub Desktop.
Save purijatin/d5e05d84941c3c408282 to your computer and use it in GitHub Desktop.
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
/**
* Transforms a {@code List<CompletableFuture<T>>} into a {@code CompletableFuture<List<T>>}.
* Useful for reducing many Futures into a single Future.
* <p>
* If any of the future fails, it returns a failed future with the exception as that of the failed one.
* </p>
* <p>
* The order of the returned list (when returned future is completed) is undisturbed.
* i.e. the same as its future counterpart in the argument list.
* </p>
* @param com a list of completable future's
* @param <T> The type of list
* @return future containing the list of results. Else failed future with the exception
*/
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(toList())
);
}
/**
* See {@link Util#sequence(List)} for more info
* @see Util#sequence(List)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> sequence(CompletableFuture<T>... com) {
return sequence(Arrays.asList(com));
}
public class UtilTest {
private ExecutorService exec = Executors.newCachedThreadPool();
private ScheduledExecutorService sched = Executors.newScheduledThreadPool(200);
@Test
public void testSequence1() throws Exception {
//test normal usage
//single task quickly finish
List<CompletableFuture<Integer>> in1 = rangeFuture(1, 2, 0);
CompletableFuture<List<Integer>> ls1 = Util.sequence(in1);
Assert.assertEquals(range(1, 2), ls1.get(2, SECONDS));
//single task async finish
List<CompletableFuture<Integer>> in2 = rangeFuture(1, 2, 500);
CompletableFuture<List<Integer>> ls2 = Util.sequence(in2);
Assert.assertEquals(range(1, 2), ls2.get(2, SECONDS));
//all over quickly and order is maintained
List<CompletableFuture<Integer>> in3 = rangeFuture(1, 70000, 0);
CompletableFuture<List<Integer>> ls3 = Util.sequence(in3);
Assert.assertEquals(range(1, 70000), ls3.get(2, SECONDS));
//all over async and order is maintained
List<CompletableFuture<Integer>> in4 = rangeFuture(1, 70000, 500);
CompletableFuture<List<Integer>> ls4 = Util.sequence(in4);
Assert.assertEquals(range(1, 70000), ls4.get(20, SECONDS));
}
@Test
public void testSequence2() throws Exception {
//test if any fail
//a test fails
List<CompletableFuture<Integer>> in = asList(failure(1000));
CompletableFuture<List<Integer>> ls = Util.sequence(in);
while(!ls.isDone());
Assert.assertTrue(ls.isCompletedExceptionally());
//no test fails.
List<CompletableFuture<Integer>> in1 = mixed(100, 1000, 0);
CompletableFuture<List<Integer>> ls1 = Util.sequence(in1);
while(!ls1.isDone());
Assert.assertFalse("Should not end exceptionally", ls1.isCompletedExceptionally());
List<CompletableFuture<Integer>> in2 = mixed(50000, 500, 0.25);
CompletableFuture<List<Integer>> ls2 = Util.sequence(in2);
while(!ls2.isDone());
Assert.assertTrue("Should end exceptionally", ls2.isCompletedExceptionally());
}
// @Test(expected = RuntimeException.class, timeout = 10000)
public void testSequence3() throws Exception{
//as soon as any test fails, it shouldnt wait for others to terminate and give result asap
CompletableFuture<Integer> success = new CompletableFuture<>();
CompletableFuture<Integer> fail = new CompletableFuture<>();
CompletableFuture<List<Integer>> out = Util.sequence(asList(success(1, 0), fail, success));
fail.completeExceptionally(new RuntimeException());
out.get();
}
private final Random random = new Random();
private <T> CompletableFuture<T> success(T value, int maxSleep) {
if (maxSleep == 0) {
return CompletableFuture.completedFuture(value);
}
CompletableFuture<T> c = new CompletableFuture<T>();
sched.schedule(() -> c.complete(value), random.nextInt(maxSleep), MILLISECONDS);
return c;
}
private <T> CompletableFuture<T> failure(Exception ex, int maxSleep) {
CompletableFuture<T> com = new CompletableFuture<>();
if (maxSleep == 0) {
com.completeExceptionally(ex);
} else {
CompletableFuture<T> c = new CompletableFuture<T>();
sched.schedule(() -> com.completeExceptionally(ex), random.nextInt(maxSleep), MILLISECONDS);
}
return com;
}
private <T> CompletableFuture<T> failure(int maxSleep) {
return failure(new RuntimeException("Auto-Failure. " + System.nanoTime()), maxSleep);
}
private List<CompletableFuture<Integer>> rangeFuture(int from, int to, int maxSleep) {
return IntStream.range(from, to)
.mapToObj(x -> success(x, maxSleep))
.collect(toList());
}
/**
* returns a list of futures. that would fail or succeed
*
* @param count number of futures
* @param maxSleep max time taken for any task to be completed. Time taken will be less than this
* @param failfrequency frequency on how many futures should fail. if 0 then all success. if 1 then all fail
* @return ls
*/
private List<CompletableFuture<Integer>> mixed(int count, int maxSleep, double failfrequency) {
if (failfrequency < 0 | failfrequency > 1)
throw new IllegalArgumentException("Frequence can only be between 0 and 1");
return IntStream.range(0, count)
.mapToObj(x -> {
if (random.nextDouble() < failfrequency)
return this.<Integer>failure(maxSleep);
else return success(1, maxSleep);
})
.collect(toList());
}
/**
* returns a list of integers
*
* @param from inclusive
* @param to exclusive
* @return list of integers
*/
private List<Integer> range(int from, int to) {
return IntStream.range(from, to).boxed().collect(toList());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment