Skip to content

Instantly share code, notes, and snippets.

@kris-zhang kris-zhang/future.java Secret
Created Jun 11, 2017

Embed
What would you like to do?
CompletableFuture用户学习代码片段
/*
* Alipay.com Inc.
* Copyright (c) 2004-2017 All Rights Reserved.
*/
package com.qunar.kris.share.jdk8;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.val;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
/**
* 54个方法
*
* @author gongzuo.zy
* @version $Id: ConcurrentStudy.java, v0.1 2017-06-02 11:42 gongzuo.zy Exp $
*/
public class ConcurrentStudy {
public static void main(String[] args) throws ExecutionException, InterruptedException {
}
public static void create() {
// 直接创建
CompletableFuture c0 = new CompletableFuture();
// 创建一个已经完成CompletableFuture
val c1 = CompletableFuture.completedFuture("result");
// 无返回值异步任务,会采用内部forkjoin线程池
val c2 = CompletableFuture.runAsync(()->{});
// 无返回值异步任务,采用自己制定的线程池
val c3 = CompletableFuture.runAsync(()->{}, newSingleThreadExecutor());
// 返回值异步任务,自己指定executor
val c4 = CompletableFuture.supplyAsync(()-> "result", newSingleThreadExecutor());
// 返回值异步任务,采用内部forkjoin线程池
val c5 = CompletableFuture.supplyAsync(()-> "result");
// 只要有一个完成,则完成,有一个抛异常,则携带异常
CompletableFuture.anyOf(c1, c2, c3, c4, c5);
// 当所有的 future 完成时,新的 future 同时完成
// 当某个方法出现了异常时,新 future 会在所有 future 完成的时候完成,并且包含一个异常.
CompletableFuture.allOf(c1, c2, c3, c4, c5);
}
public static void get() throws ExecutionException, InterruptedException, TimeoutException {
val cf = CompletableFuture.completedFuture("result");
//不抛出异常
cf.join();
//有异常,需要显示处理
cf.get();
//有异常,需要显示处理
cf.get(1, TimeUnit.HOURS);
//如果有值则直接返回
cf.getNow("absent");
// 是否取消任务
cf.isCancelled();
// 是否有异常
cf.isCompletedExceptionally();
// 是否正常完成
cf.isDone();
}
public static void control() {
val cf = CompletableFuture.completedFuture("result");
// 主动完成
cf.complete("haha");
// 产生异常
cf.completeExceptionally(new RuntimeException());
// 直接取消是否中断正在执行的任务
cf.cancel(false);
}
public static void then() {
val cf = CompletableFuture.completedFuture("result");
// 一种接续,上一个完成后,我们做一些动作
// 我们可以不断的接续调用
// 有返回值版本 apply:有返回值,accept无返回值 run无入参数
cf.thenApply(a->{System.out.println(a);return "a";})
.thenApplyAsync(a->{System.out.println(a);return "b";});
cf.thenApplyAsync(a->{System.out.println(a);return "b";});
cf.thenApplyAsync(a->{System.out.println(a);return "b";}, newSingleThreadExecutor());
// 无返回值版本
cf.thenAccept(System.out::println);
cf.thenAcceptAsync(System.out::println);
cf.thenAcceptAsync(System.out::println, newSingleThreadExecutor());
// 无入参数版本
cf.thenRun(()-> System.out.println("a"));
cf.thenRunAsync(()-> System.out.println("a"));
cf.thenRunAsync(()-> System.out.println("a"), newSingleThreadExecutor());
/////以上是一个future组合一个行为
/////以下是多个future组合
// 将a,b两个future经过bifunction进行组合
cf.thenCombine(cf, (a,b)->"result");
cf.thenCombineAsync(cf, (a,b)->"result");
cf.thenCombineAsync(cf, (a,b)->"result", newSingleThreadExecutor());
cf.thenAcceptBoth(cf, (a,b)->{});
cf.thenAcceptBothAsync(cf, (a,b)->{});
cf.thenAcceptBothAsync(cf, (a,b)->{}, newSingleThreadExecutor());
cf.runAfterBoth(cf, ()->{});
cf.runAfterBothAsync(cf, ()->{});
cf.runAfterBothAsync(cf, ()->{}, newSingleThreadExecutor());
// 只处理第二个返回值,有返回值
cf.applyToEither(cf, a->"result");
cf.applyToEitherAsync(cf, a->"result");
cf.applyToEitherAsync(cf, a->"result", newSingleThreadExecutor());
cf.acceptEither(cf, (a)->System.out.println("haha"));
cf.acceptEitherAsync(cf, (a)->System.out.println("haha"));
cf.acceptEitherAsync(cf, (a)->System.out.println("haha"), newSingleThreadExecutor());
cf.runAfterEither(cf, ()->{});
cf.runAfterEitherAsync(cf, ()->{});
cf.runAfterEitherAsync(cf, ()->{}, newSingleThreadExecutor());
// 自定义future创建
cf.thenCompose(a->CompletableFuture.supplyAsync(()->a));
cf.thenComposeAsync(a->CompletableFuture.supplyAsync(()->a));
cf.thenComposeAsync(a->CompletableFuture.supplyAsync(()->a), newSingleThreadExecutor());
//以下是返回值处理
//参数:返回值,异常 无输出
cf.whenComplete((a, b)->{});
cf.whenCompleteAsync((a, b)->{});
cf.whenCompleteAsync((a, b)->{}, newSingleThreadExecutor());
cf.handle((a, b) -> "");
cf.handleAsync((a, b) -> "");
cf.handleAsync((a, b) -> "", newSingleThreadExecutor());
// 调用这个就开始阻塞,如果产生异常则会返回default,如果没有异常则返回值跟主future一样
cf.exceptionally(a-> "default");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.