Skip to content

Instantly share code, notes, and snippets.

@kris-zhang
Created June 11, 2017 04:28
Show Gist options
  • Save kris-zhang/d62853e90cfb72bbfc2a888b2dfcb784 to your computer and use it in GitHub Desktop.
Save kris-zhang/d62853e90cfb72bbfc2a888b2dfcb784 to your computer and use it in GitHub Desktop.
CompletableFuture用户学习代码片段
/*
* Alipay.com Inc.
* Copyright (c) 2004-2017 All Rights Reserved.
*/
package com.qunar.kris.share.jdk8;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.val;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
/**
* 54个方法
*
* @author gongzuo.zy
* @version $Id: ConcurrentStudy.java, v0.1 2017-06-02 11:42 gongzuo.zy Exp $
*/
public class ConcurrentStudy {
public static void main(String[] args) throws ExecutionException, InterruptedException {
}
public static void create() {
// 直接创建
CompletableFuture c0 = new CompletableFuture();
// 创建一个已经完成CompletableFuture
val c1 = CompletableFuture.completedFuture("result");
// 无返回值异步任务,会采用内部forkjoin线程池
val c2 = CompletableFuture.runAsync(()->{});
// 无返回值异步任务,采用自己制定的线程池
val c3 = CompletableFuture.runAsync(()->{}, newSingleThreadExecutor());
// 返回值异步任务,自己指定executor
val c4 = CompletableFuture.supplyAsync(()-> "result", newSingleThreadExecutor());
// 返回值异步任务,采用内部forkjoin线程池
val c5 = CompletableFuture.supplyAsync(()-> "result");
// 只要有一个完成,则完成,有一个抛异常,则携带异常
CompletableFuture.anyOf(c1, c2, c3, c4, c5);
// 当所有的 future 完成时,新的 future 同时完成
// 当某个方法出现了异常时,新 future 会在所有 future 完成的时候完成,并且包含一个异常.
CompletableFuture.allOf(c1, c2, c3, c4, c5);
}
public static void get() throws ExecutionException, InterruptedException, TimeoutException {
val cf = CompletableFuture.completedFuture("result");
//不抛出异常
cf.join();
//有异常,需要显示处理
cf.get();
//有异常,需要显示处理
cf.get(1, TimeUnit.HOURS);
//如果有值则直接返回
cf.getNow("absent");
// 是否取消任务
cf.isCancelled();
// 是否有异常
cf.isCompletedExceptionally();
// 是否正常完成
cf.isDone();
}
public static void control() {
val cf = CompletableFuture.completedFuture("result");
// 主动完成
cf.complete("haha");
// 产生异常
cf.completeExceptionally(new RuntimeException());
// 直接取消是否中断正在执行的任务
cf.cancel(false);
}
public static void then() {
val cf = CompletableFuture.completedFuture("result");
// 一种接续,上一个完成后,我们做一些动作
// 我们可以不断的接续调用
// 有返回值版本 apply:有返回值,accept无返回值 run无入参数
cf.thenApply(a->{System.out.println(a);return "a";})
.thenApplyAsync(a->{System.out.println(a);return "b";});
cf.thenApplyAsync(a->{System.out.println(a);return "b";});
cf.thenApplyAsync(a->{System.out.println(a);return "b";}, newSingleThreadExecutor());
// 无返回值版本
cf.thenAccept(System.out::println);
cf.thenAcceptAsync(System.out::println);
cf.thenAcceptAsync(System.out::println, newSingleThreadExecutor());
// 无入参数版本
cf.thenRun(()-> System.out.println("a"));
cf.thenRunAsync(()-> System.out.println("a"));
cf.thenRunAsync(()-> System.out.println("a"), newSingleThreadExecutor());
/////以上是一个future组合一个行为
/////以下是多个future组合
// 将a,b两个future经过bifunction进行组合
cf.thenCombine(cf, (a,b)->"result");
cf.thenCombineAsync(cf, (a,b)->"result");
cf.thenCombineAsync(cf, (a,b)->"result", newSingleThreadExecutor());
cf.thenAcceptBoth(cf, (a,b)->{});
cf.thenAcceptBothAsync(cf, (a,b)->{});
cf.thenAcceptBothAsync(cf, (a,b)->{}, newSingleThreadExecutor());
cf.runAfterBoth(cf, ()->{});
cf.runAfterBothAsync(cf, ()->{});
cf.runAfterBothAsync(cf, ()->{}, newSingleThreadExecutor());
// 只处理第二个返回值,有返回值
cf.applyToEither(cf, a->"result");
cf.applyToEitherAsync(cf, a->"result");
cf.applyToEitherAsync(cf, a->"result", newSingleThreadExecutor());
cf.acceptEither(cf, (a)->System.out.println("haha"));
cf.acceptEitherAsync(cf, (a)->System.out.println("haha"));
cf.acceptEitherAsync(cf, (a)->System.out.println("haha"), newSingleThreadExecutor());
cf.runAfterEither(cf, ()->{});
cf.runAfterEitherAsync(cf, ()->{});
cf.runAfterEitherAsync(cf, ()->{}, newSingleThreadExecutor());
// 自定义future创建
cf.thenCompose(a->CompletableFuture.supplyAsync(()->a));
cf.thenComposeAsync(a->CompletableFuture.supplyAsync(()->a));
cf.thenComposeAsync(a->CompletableFuture.supplyAsync(()->a), newSingleThreadExecutor());
//以下是返回值处理
//参数:返回值,异常 无输出
cf.whenComplete((a, b)->{});
cf.whenCompleteAsync((a, b)->{});
cf.whenCompleteAsync((a, b)->{}, newSingleThreadExecutor());
cf.handle((a, b) -> "");
cf.handleAsync((a, b) -> "");
cf.handleAsync((a, b) -> "", newSingleThreadExecutor());
// 调用这个就开始阻塞,如果产生异常则会返回default,如果没有异常则返回值跟主future一样
cf.exceptionally(a-> "default");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment