A CompletableFuture
object is a placeholder for a result that might not be available
at present but will be available in future.
The CompletableFuture
library provides mechanisms to perform asynchronous
operations. It implements the Future
and the CompletionStage
interfaces.
The computations for obtaining the results need not be asynchronous.
Let us create a transfer()
method that will return a CompletableFuture
object.
import java.util.concurrent.CompletableFuture;
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
return CompletableFuture.completedFuture(true);
}
}
The same effect can be achieved with the following changes in the method.
import java.util.concurrent.CompletableFuture;
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
CompletableFuture<Boolean> isTransferSuccessful = new CompletableFuture<>();
isTransferSuccessful.complete(true);
return isTransferSuccessful;
}
}
Now we want to utilize the transfer()
method and assuming it always succeeds, print
a message.
import bankutils.Accounts;
import bankutils.TransferUtils;
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
() -> System.out.println("The transaction has been completed successfully.")
);
}
}
The thenRun()
method takes a Runnable
that does not take any arguments and
executes it once the value of CompletableFuture
is resolved.
Let us say that our transfer()
method has multiple validations to make and takes some time
to run.
import java.util.concurrent.CompletableFuture;
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
try {
Thread.sleep(5000); // <-- Introduced delay
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
return CompletableFuture.completedFuture(true);
}
}
}
And we change our controller like this.
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
() -> System.out.println("The transaction has been completed successfully.") // <-- line 1
);
System.out.println("Work done. Exiting controller"); // <-- line 2
}
}
What is the output going to be?
Why did the line 2 wait for the line 1 to be executed first?
Let us play with the threads to understand the flow.
Delegated the formatting to a helper method.
public class Helper {
synchronized public static void printWithStatus(String message) { // <-- Notice synchronized
System.out.println("****");
System.out.println(message);
System.out.println("Running thread: " + Thread.currentThread().getId());
System.out.println("Total threads running: " + Thread.activeCount());
System.out.println("****\n");
}
}
import static bankutils.Helper.printWithStatus;
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printWithStatus("Completing transfer");
return CompletableFuture.completedFuture(true);
}
}
}
public class TransferController {
public static void main(String args[]) {
printWithStatus("Starting Transfer controller"); // <-- line 1
TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
() -> printWithStatus("The transaction has been completed successfully.") // <-- line 2
);
printWithStatus("Work done. Exiting controller"); // <-- line 3
}
}
Let us introduce some delay in the consumer as well.
public class TransferController {
public static void main(String args[]) {
printWithStatus("Starting Transfer controller"); // <-- line 1
TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printWithStatus("The transaction has been completed successfully."); // <-- line 2
}
}
);
printWithStatus("Work done. Exiting controller"); // <-- line 3
}
}
thenRun()
does not run the consumer asynchronously.
-
Completing the value asynchronously with
supplyAsync()
public class TransferUtils { public static CompletableFuture<Boolean> transfer( Accounts fromAccount, Accounts toAccount, float amount) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { printWithStatus("Completing transfer"); return true; } }); } }
public class TransferController { public static void main(String args[]) { printWithStatus("Starting Transfer controller"); // <-- line 1 TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun( () -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { printWithStatus("The transaction has been completed successfully."); // <-- line 2 } } ); printWithStatus("Work done. Exiting controller"); // <-- line 3 waitForThreads(); // <-- Lets wait for all threads to complete } }
public class Helper { public static void waitForThreads() { try { ForkJoinPool.commonPool().awaitTermination(12, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }
If the consumer method are not asynchronous by themselves, then the same thread that completed (resolved) the
CompletableFuture
will run the function supplied to thethenRun()
method.Most of the times we do not control how the CompletableFuture is resolved.
-
Use
thenRunAsync()
method to make the consuming function asynchronouspublic class TransferController { public static void main(String args[]) { printWithStatus("Starting Transfer controller"); // <-- line 1 TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRunAsync( // <-- Notice this () -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { printWithStatus("The transaction has been completed successfully."); // <-- line 2 } } ); printWithStatus("Work done. Exiting controller"); // <-- line 3 waitForThreads(); } }
Lets go back to our synchronous CompletableFuture
.
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printWithStatus("Completing transfer");
return CompletableFuture.completedFuture(true);
}
}
}
And add multiple asynchronous computations to the CompletableFuture
public class TransferController {
public static void main(String args[]) {
CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);
transferFuture.thenRunAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
printWithStatus("First function completed successfully."); // <-- line 2
});
transferFuture.thenRunAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
printWithStatus("Second function completed successfully."); // <-- line 2
});
waitForThreads(); // <-- Lets wait for all threads to complete
}
}
The order of completion is unpredictable because different threads run different computations
Lets make the computations synchronous
public class TransferController {
public static void main(String args[]) {
CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);
transferFuture.thenRun(() -> {
printWithStatus("First function completed successfully."); // <-- line 2
});
transferFuture.thenRun(() -> {
printWithStatus("Second function completed successfully."); // <-- line 2
});
waitForThreads(); // <-- Lets wait for all threads to complete
}
}
There is only a single thread running. So the order is predictable.
Now lets make the CompletableFuture
method asynchronous.
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
printWithStatus("Completing transfer");
return true;
}
});
}
}
Now the computations are run in reverse order.
ComplatableFuture
registers the computations into a stack. Hence the reverse order.
Even if we introduce some delay in computations as in
public class TransferController {
public static void main(String args[]) {
CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);
transferFuture.thenRun(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
printWithStatus("First function completed successfully."); // <-- line 2
});
transferFuture.thenRun(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
printWithStatus("Second function completed successfully."); // <-- line 2
});
waitForThreads(); // <-- Lets wait for all threads to complete
}
}
We can see that same thread that has completed the CompletableFuture
runs the computations.
It runs each of the computations in the reverse order, serially, with a single thread.
So, each computation may block (delay) the next computations.
Proceed with caution while adding multiple computations to a single CompletableFuture
.
Avoid if they are dependent on each other.
We want our controller to act based on the result. Lets change our
transfer()
method to
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return ThreadLocalRandom.current().nextBoolean();
}
});
}
}
and our controller to
public class TransferController {
public static void main(String args[]) throws ExecutionException, InterruptedException {
boolean isTransferSuccessful = TransferUtils.transfer(
Accounts.A, Accounts.B, 200).get(); // <-- Notice This
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
}
}
The get()
method is blocking. It waits for the result unless an exception occurs while
resolving the CompletableFuture
. Throws ExecutionException
, InterruptedException
.
public class TransferController { // <-- Notice no Exceptions
public static void main(String args[]) {
boolean isTransferSuccessful = TransferUtils.transfer(
Accounts.A, Accounts.B, 200)
.join(); // <-- Notice This
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
}
}
The join()
method behaves similarly, except that it throws unchecked exception.
We can achieve the same result with thenAccept()
or thenAcceptAsync()
.
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200)
.thenAccept((isTransferSuccessful) -> { // <-- Notice
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
});
}
}
thenAcceptAsync()
can be used similarly.
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200)
.thenAcceptAsync((isTransferSuccessful) -> { // <-- Notice
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
});
waitForThreads();
}
}
All async methods of CompletableFuture
(suffixed with async) can take an Executor
as a second parameter.
public class TransferController {
public static void main(String args[]) throws InterruptedException {
Executor cachedExecutor = Executors.newCachedThreadPool();
TransferUtils.transfer(Accounts.A, Accounts.B, 200)
.thenAcceptAsync((isTransferSuccessful) -> {
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
((ExecutorService) cachedExecutor).shutdown();
}, cachedExecutor); // <-- Notice
((ExecutorService) cachedExecutor).awaitTermination(10, TimeUnit.SECONDS);
}
}
Java uses the supplied executor to run the computation asynchronously.
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
CompletableFuture isTransferSuccessful = new CompletableFuture();
ForkJoinPool.commonPool().submit(() -> {
throw new ArithmeticException("Bad values");
});
return isTransferSuccessful;
}
}
If an exception occurs while resolving the CompletableFuture
the then
methods are useless.
To handle such cases, java provides completeExceptionally()
method.
public class TransferUtils {
public static CompletableFuture<Boolean> transfer(
Accounts fromAccount, Accounts toAccount, float amount) {
CompletableFuture isTransferSuccessful = new CompletableFuture();
ForkJoinPool.commonPool().submit(() -> {
isTransferSuccessful.completeExceptionally(new ArithmeticException("Bad values"));
});
return isTransferSuccessful;
}
}
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200)
.thenAcceptAsync((isTransferSuccessful) -> { // <-- Notice
if (isTransferSuccessful) {
System.out.println("Transfer completed successfully.");
} else {
System.out.println("Transfer failed");
}
System.out.println("Operation complete");
});
waitForThreads();
}
}
When the CompletableFuture
completes exceptionally, the Runnable
s supplied to then
methods
are never called.
public class TransferController {
public static void main(String args[]) throws ExecutionException, InterruptedException {
TransferUtils.transfer(Accounts.A, Accounts.B, 200).get();
waitForThreads();
}
}
get()
will throw the ExecutionException
with the provided exception as cause.
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200).join();
waitForThreads();
}
}
Similarly, join()
will embed the cause into an unchecked CompletionExeception
.
But if you want to handle the exception in a futuristic way use whenComplete()
public class TransferController {
public static void main(String args[]) {
CompletableFuture transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);
transferFuture.whenComplete((amount, exception) -> {
if (exception == null)
System.out.println("Total debited: " + amount);
else
System.out.println("Failed to transfer");
});
waitForThreads();
}
}
The thenApply()
and thenApplyAsync()
can be used to result of a CompletableFuture
public class TransferUtils {
static float TRANSFER_CHARGES = 5;
private static CompletableFuture<Double> transferExcludingCharges(
Accounts fromAccount, Accounts toAccount, Double amount) {
return CompletableFuture.supplyAsync(() -> amount);
}
public static CompletableFuture<Double> transfer(
Accounts fromAccount, Accounts toAccount, Double amount) {
return transferExcludingCharges(fromAccount, toAccount, amount)
.thenApply((transferredAmount) -> transferredAmount * (100 + TRANSFER_CHARGES) / 100);
}
}
And our controller looks likes this.
public class TransferController {
public static void main(String args[]) {
TransferUtils.transfer(Accounts.A, Accounts.B, 200.0)
.thenAccept((amount) -> {
System.out.println("Total debited: " + amount);
});
waitForThreads();
}
}
The same effect can be achieved with thenCompose()
method.
public class TransferUtils {
static float TRANSFER_CHARGES = 5;
private static CompletableFuture<Double> transferExcludingCharges(
Accounts fromAccount, Accounts toAccount, Double amount) {
return CompletableFuture.supplyAsync(() -> amount);
}
public static CompletableFuture<Double> transfer(
Accounts fromAccount, Accounts toAccount, Double amount) {
return transferExcludingCharges(fromAccount, toAccount, amount)
.thenCompose((transferredAmount) -> CompletableFuture.supplyAsync(
() -> transferredAmount * (100 + TRANSFER_CHARGES) / 100));
}
}
The difference is Runnable
supplied to thenCompose()
will return a CompletableFuture
,
where as that of thenApply()
will return an object of same type as result.
Since both thenApply()
and thenCompose()
return a CompletableFuture
,
they can be used to chain the computations, unlike thenRun()
and thenAccept()
.
If we want to work with the results of two CompletableFuture
s we can use thenCombine()
public class TransferUtils {
static float TRANSFER_CHARGES = 5;
private static CompletableFuture<Double> transferExcludingCharges(
Accounts fromAccount, Accounts toAccount, Double amount) {
return CompletableFuture.supplyAsync(() -> amount);
}
private static CompletableFuture<Double> deductTransferFee(Accounts fromAccount, Double amount) {
return transferExcludingCharges(fromAccount, Accounts.C, amount * TRANSFER_CHARGES / 100);
}
public static CompletableFuture<Double> transfer(
Accounts fromAccount, Accounts toAccount, Double amount) {
CompletableFuture<Double> transferFuture = transferExcludingCharges(fromAccount, toAccount, amount);
CompletableFuture<Double> transferChargesFuture = deductTransferFee(fromAccount, amount);
return transferFuture.thenCombine(transferChargesFuture,
(transferAmount, chargesAmount) -> transferAmount + chargesAmount);
}
}
Similarly there are thenCombineBothAsync()
, thenAcceptBoth()
, runAfterBoth()
,
runAfterEither()
, acceptEither()
, applyToEither()
.
public class TransferUtils {
private static CompletableFuture<Boolean> transfer(
String fromAccountNo, String toAccountNumber, Double amount) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(5000) * 1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return true;
}
});
}
public static CompletableFuture[] transferSalaries() {
String[] employeeAccounts = {"1", "2", "3", "4", "5", "6", "7", "8"};
String companyAccount = "123";
return Arrays.stream(employeeAccounts)
.map(employeeAccount -> transfer(companyAccount, employeeAccount, 1000.0))
.collect(Collectors.toList())
.toArray(new CompletableFuture[employeeAccounts.length]);
}
}
public class TransferController {
public static void main(String args[]) {
CompletableFuture[] transferFutures = TransferUtils.transferSalaries();
CompletableFuture.anyOf(transferFutures)
.thenRun(() -> System.out.println("Transfer started"));
CompletableFuture.allOf(transferFutures)
.thenRun(() -> System.out.println("Transfer complete"));
waitForThreads();
}
}
anyOf()
and allOf()
return a new CompletableFuture that executes when any of the
supplied Futures is complete and all of them are complete respectively.
We can use streams and join()
together to consume the values of a set of CompletableFutures
.
public class TransferController {
public static void main(String args[]) {
CompletableFuture[] transferFutures = TransferUtils.transferSalaries();
CompletableFuture.anyOf(transferFutures)
.thenRun(() -> System.out.println("Transfer started"));
CompletableFuture.allOf(transferFutures)
.thenRun(() -> System.out.println("Transfer complete"));
Double transferAmounts[] = Stream.of(transferFutures)
.map(CompletableFuture::join) // <-- Notice this
.collect(Collectors.toList())
.toArray(new Double[transferFutures.length]);
double totalTransferred = DoubleStream.of(Arrays.stream(transferAmounts)
.mapToDouble(d -> d)
.toArray())
.sum();
System.out.println("Total amount transferred: Rs. " + totalTransferred);
}
}
https://www.baeldung.com/java-completablefuture
https://dzone.com/articles/20-examples-of-using-javas-completablefuture
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html