Skip to content

Instantly share code, notes, and snippets.

@purijatin
Created September 4, 2017 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save purijatin/0f5ba38722436018802f22fd6eb98a63 to your computer and use it in GitHub Desktop.
Save purijatin/0f5ba38722436018802f22fd6eb98a63 to your computer and use it in GitHub Desktop.
Get started with CompletableFuture Session
/**
* A Future represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Additional methods are provided to
* determine if the task completed normally or was cancelled.
*
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
CompletableFuture is a:
- Future
- CompletionStage
It is a class to do asynchronous computation. To rephrase:
• A synchronous call waits till the operation completes.
• An asynchronous call does not wait but only initiates the operation and it will be notified in the future once the operation is completed.
Although with asynchronous, the operation could either be completed by someone else or by the process/person himself.
When we initiate an asynchronous computation, the result of that computation will be stored in a `Future` object. Though at any given moment, Future may or may not contain the final result.
• If the computation is successfully completed, the future object will contain the final value
• If the computation is not yet completed, the future object will not contain the result
• If the computation fails, the future object will contain the cause for the failure (Throwable)
The result whether success/failure becomes available in Future object, once the asynchronous computation is completed.
*/
import com.arcesium.commons.concurrent.util.CompletableFutureUtils;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompletableFuturePrac {
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* CompletableFuture is a:
* - Future
* - CompletionStage
*
*/
Future<String> future = someComputationFuture();
CompletableFuture<String> f1 = someComputation();
CompletableFuture<String> f2 = f1.thenApply(x -> "hello -> " + x);
f2
.thenApply(x -> "Log " + CompletableFuturePrac.class + ": " + x)
.handle((String value, Throwable ex) -> {
if (ex != null) {
return "default";
} else {
return value;
}
})
.whenComplete((value, ex) -> {
if (ex != null) {
ex.printStackTrace();
} else {
System.out.println("When Complete " + value);
}
});
/*
consider different scenario:
* We need to read the user name based on id from database
* Google search the username
* Store the results in database
*/
CompletableFuture<List<String>> result = getUserFromDB(1)
.thenCompose(user -> getGoogleUserSearchResults(user));
System.out.println(result.get());
/**
* Exception handling
* - If any of the stage results in failure, the final result is a failure
*/
CompletableFuture<Integer> f6 = failed();
f6.thenApply(x -> {
System.out.println("g6 thenApply called");
return x + 1;
}).thenAccept(System.out::println);
// System.out.println("f6: "+f6.join());
go3();
}
private static void go3() {
/*
* How do we create CompletableFutures? *
*/
//The simplest way when we know the value:
CompletableFuture<String> hello = CompletableFuture.completedFuture("hello");
//in case of exception
CompletableFuture<String> failed = CompletableFutureUtils.failed(new RuntimeException());
//to spawn an async communication: Supplier
CompletableFuture<Integer> supply = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(1000 * 1000 * 1000L);
//do other operations
System.out.println("supply -> Operation running " + Thread.currentThread());
return 100;
});
//to execute an async procedure
CompletableFuture.runAsync(() -> {
LockSupport.parkNanos(1000 * 1000 * 1000L);
//do other operations
System.out.println("run -> Operation completed " + Thread.currentThread());
});
/*
Question: In which pool are the above operations executed?
*/
ExecutorService exec = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(1000 * 1000 * 1000L);
//do other operations
return 100;
}, exec);
CompletableFuture.runAsync(() -> {
LockSupport.parkNanos(1000 * 1000 * 1000L);
//do other operations
System.out.println("Operation completed");
}, exec);
/*
Similarly, we can have control over the executor when dealing with async operations as well
*/
CompletableFuture<Double> sqrt = getSqrt(100);
/*
Thread[main,5,main] - whenComplete - : 10.0
Thread[Voodoo thread,5,main] - whenComplete - : 10.0
Thread[ForkJoinPool.commonPool-worker-3,5,main] - whenComplete - : 10.0
Thread[pool-1-thread-3,5,main] - whenComplete - : 10.0
*/
sqrt.whenComplete(whenCompletePrint());
new Thread(() -> {
sqrt.whenComplete(whenCompletePrint());
}, "Voodoo thread").start();
sqrt.whenCompleteAsync(whenCompletePrint());
sqrt.whenCompleteAsync(whenCompletePrint(), exec);
utils();
}
private static void utils() {
//sequence
List<CompletableFuture<Double>> all = IntStream.range(1, 100)
.mapToObj(x -> getSqrt(x)).collect(Collectors.toList());
CompletableFuture<List<Double>> ans = CompletableFutureUtils.sequence(all);
ans.thenApply(list -> list.stream().mapToDouble(y -> y).sum())
.whenComplete(whenCompletePrint());
//ans would be completed with List(1,2,3)
//travers
}
private static BiConsumer<Double, Throwable> whenCompletePrint() {
return (x, ex) -> {
if (ex != null) {
ex.printStackTrace();
}
System.out.println(Thread.currentThread() + " - whenComplete - : " + x);
};
}
/**
* Returns the user name associated with the user id
*
* @param id the id of the user
* @return
*/
private static CompletableFuture<String> getUserFromDB(int id) {
return CompletableFuture.completedFuture("sachin");
}
private static CompletableFuture<List<String>> getGoogleUserSearchResults(String user) {
return CompletableFuture.completedFuture(Arrays.asList("http://www.cricbuzz.com/profiles/25/sachin-tendulkar", "https://twitter.com/sachin_rt?ref_src=twsrc%5Egoogle%7Ctwcamp%5Eserp%7Ctwgr%5Eauthor",
"https://en.wikipedia.org/wiki/Sachin_Tendulkar"));
}
private static Future<String> someComputationFuture() {
return new FutureTask<>(() -> "1");
}
;
private static CompletableFuture<String> someComputation() {
return CompletableFuture.completedFuture("arcesium");
}
private static CompletableFuture<Double> getSqrt(int number) {
return CompletableFuture.supplyAsync(() -> {
// LockSupport.parkNanos(1000000);
return Math.sqrt(number);
});
}
private static <T> CompletableFuture<T> failed() {
return CompletableFutureUtils.failed(new RuntimeException());
}
private static void go1() {
CompletableFuture<String> future = new CompletableFuture<>();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment