Skip to content

Instantly share code, notes, and snippets.

@rvteja92
Last active May 16, 2019 17:31
Show Gist options
  • Save rvteja92/76eb82e4f2376c112b537333df0d1473 to your computer and use it in GitHub Desktop.
Save rvteja92/76eb82e4f2376c112b537333df0d1473 to your computer and use it in GitHub Desktop.

CompletableFuture

Definition

A CompletableFuture object is a placeholder for a result that might not be available at present but will be available in future.

The CompletableFuture library provides mechanisms to perform asynchronous operations. It implements the Future and the CompletionStage interfaces.

The computations for obtaining the results need not be asynchronous.

Creating a CompletableFuture

Let us create a transfer() method that will return a CompletableFuture object.

import java.util.concurrent.CompletableFuture;

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {
        
        return CompletableFuture.completedFuture(true);
    }
}

The same effect can be achieved with the following changes in the method.

import java.util.concurrent.CompletableFuture;

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {
        
        CompletableFuture<Boolean> isTransferSuccessful = new CompletableFuture<>();
        isTransferSuccessful.complete(true);
        return isTransferSuccessful;
    }
}

Consuming the CompletableFuture - thenRun() method

Now we want to utilize the transfer() method and assuming it always succeeds, print a message.

import bankutils.Accounts;
import bankutils.TransferUtils;

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
            () -> System.out.println("The transaction has been completed successfully.")
        );
    }
}

The thenRun() method takes a Runnable that does not take any arguments and executes it once the value of CompletableFuture is resolved.

Is CompletableFuture asynchronous?

Let us say that our transfer() method has multiple validations to make and takes some time to run.

import java.util.concurrent.CompletableFuture;

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {

        try {
            Thread.sleep(5000); //                  <-- Introduced delay
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            return CompletableFuture.completedFuture(true);
        }
    }
}

And we change our controller like this.

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
            () -> System.out.println("The transaction has been completed successfully.") // <-- line 1
        );
        System.out.println("Work done. Exiting controller"); // <-- line 2
    }
}

What is the output going to be?

CompletableFuture is not inherently Asynchronous

Why did the line 2 wait for the line 1 to be executed first?

Let us play with the threads to understand the flow.

Delegated the formatting to a helper method.

public class Helper {
    synchronized public static void printWithStatus(String message) {   // <-- Notice synchronized
        System.out.println("****");
        System.out.println(message);
        System.out.println("Running thread: " + Thread.currentThread().getId());
        System.out.println("Total threads running: " + Thread.activeCount());
        System.out.println("****\n");
    }
}
import static bankutils.Helper.printWithStatus;

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            printWithStatus("Completing transfer");

            return CompletableFuture.completedFuture(true);
        }
    }
}
public class TransferController {
    public static void main(String args[]) {
        printWithStatus("Starting Transfer controller"); // <-- line 1
        
        TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
            () ->  printWithStatus("The transaction has been completed successfully.") // <-- line 2
        );

        printWithStatus("Work done. Exiting controller"); // <-- line 3
    }
}

Let us introduce some delay in the consumer as well.

public class TransferController {
    public static void main(String args[]) {
        printWithStatus("Starting Transfer controller"); // <-- line 1

        TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
            () ->  {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    printWithStatus("The transaction has been completed successfully."); // <-- line 2
                }
            }
        );

        printWithStatus("Work done. Exiting controller"); // <-- line 3
    }
}

thenRun() does not run the consumer asynchronously.

Working asynchronously

  1. Completing the value asynchronously with supplyAsync()

    public class TransferUtils {
        public static CompletableFuture<Boolean> transfer(
                Accounts fromAccount, Accounts toAccount, float amount) {
            
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    printWithStatus("Completing transfer");
                    return true;
                }
            });
        }
    }
    public class TransferController {
        public static void main(String args[]) {
            printWithStatus("Starting Transfer controller"); // <-- line 1
    
            TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRun(
                () ->  {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        printWithStatus("The transaction has been completed successfully."); // <-- line 2
                    }
                }
            );
    
            printWithStatus("Work done. Exiting controller"); // <-- line 3
            waitForThreads();           // <-- Lets wait for all threads to complete
        }
    }
    public class Helper {
        public static void waitForThreads() {
            try {
                ForkJoinPool.commonPool().awaitTermination(12, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    If the consumer method are not asynchronous by themselves, then the same thread that completed (resolved) the CompletableFuture will run the function supplied to the thenRun() method.

    Most of the times we do not control how the CompletableFuture is resolved.

  2. Use thenRunAsync() method to make the consuming function asynchronous

    public class TransferController {
        public static void main(String args[]) {
            printWithStatus("Starting Transfer controller"); // <-- line 1
    
            TransferUtils.transfer(Accounts.A, Accounts.B, 200).thenRunAsync(  // <-- Notice this
                () ->  {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        printWithStatus("The transaction has been completed successfully."); // <-- line 2
                    }
                }
            );
    
            printWithStatus("Work done. Exiting controller"); // <-- line 3
            waitForThreads();
        }
    }

Order of computations

Lets go back to our synchronous CompletableFuture.

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            printWithStatus("Completing transfer");

            return CompletableFuture.completedFuture(true);
        }
    }
}

And add multiple asynchronous computations to the CompletableFuture

public class TransferController {
    public static void main(String args[]) {
        CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);

        transferFuture.thenRunAsync(() ->  {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printWithStatus("First function completed successfully."); // <-- line 2
        });

        transferFuture.thenRunAsync(() ->  {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printWithStatus("Second function completed successfully."); // <-- line 2
        });

        waitForThreads();           // <-- Lets wait for all threads to complete
    }
}

The order of completion is unpredictable because different threads run different computations

Lets make the computations synchronous

public class TransferController {
    public static void main(String args[]) {
        CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);

        transferFuture.thenRun(() ->  {
            printWithStatus("First function completed successfully."); // <-- line 2
        });

        transferFuture.thenRun(() ->  {
            printWithStatus("Second function completed successfully."); // <-- line 2
        });

        waitForThreads();           // <-- Lets wait for all threads to complete
    }
}

There is only a single thread running. So the order is predictable.

Now lets make the CompletableFuture method asynchronous.

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                printWithStatus("Completing transfer");
                return true;
            }
        });
    }
}

Now the computations are run in reverse order.

ComplatableFuture registers the computations into a stack. Hence the reverse order.

Even if we introduce some delay in computations as in

public class TransferController {
    public static void main(String args[]) {
        CompletableFuture<Boolean> transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);

        transferFuture.thenRun(() ->  {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printWithStatus("First function completed successfully."); // <-- line 2
        });

        transferFuture.thenRun(() ->  {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            printWithStatus("Second function completed successfully."); // <-- line 2
        });

        waitForThreads();           // <-- Lets wait for all threads to complete
    }
}

We can see that same thread that has completed the CompletableFuture runs the computations. It runs each of the computations in the reverse order, serially, with a single thread. So, each computation may block (delay) the next computations.

Proceed with caution while adding multiple computations to a single CompletableFuture. Avoid if they are dependent on each other.

Working with the results of CompletableFuture

We want our controller to act based on the result. Lets change our transfer() method to

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                return ThreadLocalRandom.current().nextBoolean();
            }
        });
    }
}

and our controller to

public class TransferController {
    public static void main(String args[]) throws ExecutionException, InterruptedException {
        boolean isTransferSuccessful = TransferUtils.transfer(
                Accounts.A, Accounts.B, 200).get();     // <-- Notice This

        if (isTransferSuccessful) {
            System.out.println("Transfer completed successfully.");
        } else {
            System.out.println("Transfer failed");
        }

    }
}

The get() method is blocking. It waits for the result unless an exception occurs while resolving the CompletableFuture. Throws ExecutionException, InterruptedException.

public class TransferController {               //  <-- Notice no Exceptions
    public static void main(String args[]) {
        boolean isTransferSuccessful = TransferUtils.transfer(
                    Accounts.A, Accounts.B, 200)
                .join();     // <-- Notice This

        if (isTransferSuccessful) {
            System.out.println("Transfer completed successfully.");
        } else {
            System.out.println("Transfer failed");
        }

    }
}

The join() method behaves similarly, except that it throws unchecked exception.

Working with results asynchronously

We can achieve the same result with thenAccept() or thenAcceptAsync().

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200)
                .thenAccept((isTransferSuccessful) -> {                 //  <-- Notice
                    if (isTransferSuccessful) {
                        System.out.println("Transfer completed successfully.");
                    } else {
                        System.out.println("Transfer failed");
                    }
                });
    }
}

thenAcceptAsync() can be used similarly.

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200)
                .thenAcceptAsync((isTransferSuccessful) -> {                 //  <-- Notice
                    if (isTransferSuccessful) {
                        System.out.println("Transfer completed successfully.");
                    } else {
                        System.out.println("Transfer failed");
                    }
                });
        waitForThreads();
    }
}

All async methods of CompletableFuture (suffixed with async) can take an Executor as a second parameter.

public class TransferController {
    public static void main(String args[]) throws InterruptedException {
        Executor cachedExecutor = Executors.newCachedThreadPool();

        TransferUtils.transfer(Accounts.A, Accounts.B, 200)
                .thenAcceptAsync((isTransferSuccessful) -> {                 
                    if (isTransferSuccessful) {
                        System.out.println("Transfer completed successfully.");
                    } else {
                        System.out.println("Transfer failed");
                    }
                    ((ExecutorService) cachedExecutor).shutdown();
                }, cachedExecutor);                                     //  <-- Notice

        ((ExecutorService) cachedExecutor).awaitTermination(10, TimeUnit.SECONDS);
    }
}

Java uses the supplied executor to run the computation asynchronously.

Handling exceptions in CompletableFuture

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {
        CompletableFuture isTransferSuccessful = new CompletableFuture();
        ForkJoinPool.commonPool().submit(() -> {
            throw new ArithmeticException("Bad values");
        });
        return isTransferSuccessful;
    }
}

If an exception occurs while resolving the CompletableFuture the then methods are useless.

To handle such cases, java provides completeExceptionally() method.

public class TransferUtils {
    public static CompletableFuture<Boolean> transfer(
            Accounts fromAccount, Accounts toAccount, float amount) {
        CompletableFuture isTransferSuccessful = new CompletableFuture();
        ForkJoinPool.commonPool().submit(() -> {
            isTransferSuccessful.completeExceptionally(new ArithmeticException("Bad values"));
        });
        return isTransferSuccessful;
    }
}
public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200)
                .thenAcceptAsync((isTransferSuccessful) -> {                 //  <-- Notice
                    if (isTransferSuccessful) {
                        System.out.println("Transfer completed successfully.");
                    } else {
                        System.out.println("Transfer failed");
                    }
                    System.out.println("Operation complete");
                });
        waitForThreads();
    }
}

When the CompletableFuture completes exceptionally, the Runnables supplied to then methods are never called.

public class TransferController {
    public static void main(String args[]) throws ExecutionException, InterruptedException {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200).get();
        waitForThreads();
    }
}

get() will throw the ExecutionException with the provided exception as cause.

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200).join();
        waitForThreads();
    }
}

Similarly, join() will embed the cause into an unchecked CompletionExeception. But if you want to handle the exception in a futuristic way use whenComplete()

public class TransferController {
    public static void main(String args[]) {
        CompletableFuture transferFuture = TransferUtils.transfer(Accounts.A, Accounts.B, 200);
        transferFuture.whenComplete((amount, exception) -> {
                    if (exception == null)
                        System.out.println("Total debited: " + amount);
                    else
                        System.out.println("Failed to transfer");
                });
        waitForThreads();
    }
}

Modifying the resolved value

The thenApply() and thenApplyAsync() can be used to result of a CompletableFuture

public class TransferUtils {
    static float TRANSFER_CHARGES = 5;

    private static CompletableFuture<Double> transferExcludingCharges(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        return CompletableFuture.supplyAsync(() -> amount);
    }

    public static CompletableFuture<Double> transfer(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        return transferExcludingCharges(fromAccount, toAccount, amount)
                .thenApply((transferredAmount) -> transferredAmount * (100 + TRANSFER_CHARGES) / 100);
    }
}

And our controller looks likes this.

public class TransferController {
    public static void main(String args[]) {
        TransferUtils.transfer(Accounts.A, Accounts.B, 200.0)
                .thenAccept((amount) -> {
                    System.out.println("Total debited: " + amount);
                });
        waitForThreads();
    }
}

The same effect can be achieved with thenCompose() method.

public class TransferUtils {
    static float TRANSFER_CHARGES = 5;

    private static CompletableFuture<Double> transferExcludingCharges(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        return CompletableFuture.supplyAsync(() -> amount);
    }

    public static CompletableFuture<Double> transfer(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        return transferExcludingCharges(fromAccount, toAccount, amount)
                .thenCompose((transferredAmount) -> CompletableFuture.supplyAsync(
                            () -> transferredAmount * (100 + TRANSFER_CHARGES) / 100));
    }
}

The difference is Runnable supplied to thenCompose() will return a CompletableFuture, where as that of thenApply() will return an object of same type as result.

Since both thenApply() and thenCompose() return a CompletableFuture, they can be used to chain the computations, unlike thenRun() and thenAccept().

Combining Futures

If we want to work with the results of two CompletableFutures we can use thenCombine()

public class TransferUtils {
    static float TRANSFER_CHARGES = 5;

    private static CompletableFuture<Double> transferExcludingCharges(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        return CompletableFuture.supplyAsync(() -> amount);
    }

    private static CompletableFuture<Double> deductTransferFee(Accounts fromAccount, Double amount) {
        return transferExcludingCharges(fromAccount, Accounts.C, amount * TRANSFER_CHARGES / 100);
    }

    public static CompletableFuture<Double> transfer(
            Accounts fromAccount, Accounts toAccount, Double amount) {
        CompletableFuture<Double> transferFuture = transferExcludingCharges(fromAccount, toAccount, amount);
        CompletableFuture<Double> transferChargesFuture = deductTransferFee(fromAccount, amount);

        return transferFuture.thenCombine(transferChargesFuture,
                (transferAmount, chargesAmount) -> transferAmount + chargesAmount);
    }
}

Similarly there are thenCombineBothAsync(), thenAcceptBoth(), runAfterBoth(), runAfterEither(), acceptEither(), applyToEither().

Working with a set of ComputableFutures

public class TransferUtils {
    private static CompletableFuture<Boolean> transfer(
            String fromAccountNo, String toAccountNumber, Double amount) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(5000) * 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                return true;
            }
        });
    }

    public static CompletableFuture[] transferSalaries() {
        String[] employeeAccounts = {"1", "2", "3", "4", "5", "6", "7", "8"};
        String companyAccount = "123";
        return Arrays.stream(employeeAccounts)
                .map(employeeAccount -> transfer(companyAccount, employeeAccount, 1000.0))
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[employeeAccounts.length]);
    }
}
public class TransferController {
    public static void main(String args[]) {
        CompletableFuture[] transferFutures = TransferUtils.transferSalaries();

        CompletableFuture.anyOf(transferFutures)
                .thenRun(() -> System.out.println("Transfer started"));

        CompletableFuture.allOf(transferFutures)
                .thenRun(() -> System.out.println("Transfer complete"));

        waitForThreads();
    }
}

anyOf() and allOf() return a new CompletableFuture that executes when any of the supplied Futures is complete and all of them are complete respectively.

We can use streams and join() together to consume the values of a set of CompletableFutures.

public class TransferController {
    public static void main(String args[]) {
        CompletableFuture[] transferFutures = TransferUtils.transferSalaries();

        CompletableFuture.anyOf(transferFutures)
                .thenRun(() -> System.out.println("Transfer started"));

        CompletableFuture.allOf(transferFutures)
                .thenRun(() -> System.out.println("Transfer complete"));

        Double transferAmounts[] = Stream.of(transferFutures)
                .map(CompletableFuture::join)               // <-- Notice this
                .collect(Collectors.toList())
                .toArray(new Double[transferFutures.length]);
        double totalTransferred = DoubleStream.of(Arrays.stream(transferAmounts)
                .mapToDouble(d -> d)
                .toArray())
                .sum();

        System.out.println("Total amount transferred: Rs. " + totalTransferred);
    }
}

References

https://www.baeldung.com/java-completablefuture

https://dzone.com/articles/20-examples-of-using-javas-completablefuture

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment