在Java8中新增了CompletableFuture
类,该类实现了Future
和CompletionStage
接口。提供了强大的Future
扩展功能,简化了异步编程的复杂性,提供了函数式编程的能力。可通过异步回调方式处理结果,还可以对任务进行组合处理。
概览
创建异步任务 CompletableFuture
提供了四个静态方法来创建一个异步操作。
1 2 3 4 5 6 public static CompletableFuture<Void> runAsync (Runnable runnable) public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor)
没有指定Executor
的话默认会使用ForkJoinPool.commonPool()
作为缺省线程池执行异步代码,其中ForkJoinPool.commonPool()
核心线程数量为CPU-1 核心数。如果指定线程池,则使用指定的线程池执行任务。
runAsync 以Runnable
函数式接口类型为参数,无返回值。
1 2 3 4 5 6 7 8 9 private static void runAsync () throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("runAsync线程名称: " + Thread.currentThread().getName()); }); future.get(); }
supplyAsync 以Supplier<U>
函数式接口类型为参数,CompletableFuture
的计算结果类型为U
。
1 2 3 4 5 6 7 8 9 10 11 private static void supplyAsync () throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName()); return System.currentTimeMillis(); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
以异步场景为例,可与List结合使用。
1 2 3 4 5 6 7 8 9 10 List<Integer> collect = Lists.newArrayList(2 , 1 , 3 ) .stream() .map(i -> CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(1 , TimeUnit.SECONDS); return i; })) .collect(Collectors.toList()) .stream() .map(CompletableFuture::join).collect(Collectors.toList());
异步回调 thenRun/thenRunAsync 1 public CompletableFuture<Void> thenRun (Runnable action) ;
执行完第一个任务再执行第二个任务,前后两个任务没有参数传递,第二个任务(thenRun
)也没有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static void thenRun () throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int result = new Random().nextInt(100 ); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }).thenRun(() -> { System.out.println("thenRun线程名称: " + Thread.currentThread().getName() + "。无参数,无返回值" ); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
thenRun和thenRunAsync区别 1 2 3 4 5 6 7 8 9 private static final Executor asyncPool = useCommonPool? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(asyncPool, action); }
如果在执行第一个任务的时候传入了一个自定义线程池:
调用thenRun
方法执行第二个任务时,第二个任务和第一个任务共用一个线程池;
调用thenRunAsync
方法执行第二个任务时,第一个任务使用自己传入的线程池,第二个任务使用ForkJoinPool
;
后面所说的thenAccept
和thenAcceptAsync
、thenApply
和thenApplyAsync
等,它们之间的区别也是如此。
thenAccept/thenAcceptAsync 1 public CompletableFuture<Void> thenAccept (Consumer<? super T> action)
执行完第一个任务后,将执行结果作为入参传递到回调方法(thenAccept
)中,回调方法无返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void thenAccept () throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int result = new Random().nextInt(100 ); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }).thenAccept(arg -> { int result = arg * 10 ; System.out.println("thenAccept线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
thenApply/thenApplyAsync 1 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn)
执行完第一个任务后,将执行结果作为入参传递到回调方法(thenApply
)中,回调方法有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void thenApply () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = new Random().nextInt(100 ); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }).thenApply(arg -> { int result = arg * 10 ; System.out.println("thenApply线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
exceptionally 1 public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn)
某个任务执行异常时,执行的回调方法,并且将抛出异常作为参数,传递到回调方法,exceptionally
方法有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private static void exceptionally () throws Exception { CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName()); throw new RuntimeException(); }).exceptionally(e -> { e.printStackTrace(); return "系统异常" ; }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
whenComplete 1 public CompletableFuture<T> whenComplete (BiConsumer<? super T, ? super Throwable> action)
某个任务执行完成后,将上个任务的结果和异常传递到回调方法whenComplete
中,无返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static void whenComplete () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = new Random().nextInt(100 ); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }).whenComplete((arg, e) -> { System.out.println("whenComplete线程名称: " + Thread.currentThread().getName() + "。参数为:" + arg); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
handle 1 public <U> CompletableFuture<U> handle (BiFunction<? super T, Throwable, ? extends U> fn)
某个任务执行完成后,将上个任务的结果和异常传递到回调方法handle
中,有返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void handle () throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = new Random().nextInt(100 ); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }).handle((arg, e) -> { int result = arg * 10 ; System.out.println("handle线程名称: " + Thread.currentThread().getName() + "。结果为:" + result); return result; }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
任务组合 AND组合关系 1 2 3 public <U,V> CompletableFuture<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public CompletionStage<Void> runAfterBoth (CompletionStage<?> other, Runnable action)
thenCombine
/thenAcceptBoth
/runAfterBoth
都表示:将两个CompletableFuture
组合起来,只有这两个都正常执行完成才会执行某个任务 。区别为:
thenCombine
:将两个任务的执行结果作为方法入参,传递到指定方法中,有返回值
thenAcceptBoth
: 会将两个任务的执行结果作为方法入参,传递到指定方法中,无返回值
runAfterBoth
:不会把执行结果当做方法入参,没有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static void thenCombine () throws Exception { CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync1线程名称: " + Thread.currentThread().getName() + "。结果为:100" ); return 100 ; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync2线程名称: " + Thread.currentThread().getName() + "。结果为:200" ); return 200 ; }); CompletableFuture<Integer> future = f1.thenCombine(f2, (arg1, arg2) -> { System.out.println("thenCombine线程名称: " + Thread.currentThread().getName() + "。结果1为:" + arg1 + "。结果2为:" + arg2); return arg1 + arg2; }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
OR组合关系 1 2 3 public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other,Function<? super T, U> fn) ;public CompletableFuture<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action) ;public CompletableFuture<Void> runAfterEither (CompletionStage<?> other, Runnable action) ;
applyToEither
/acceptEither
/runAfterEither
都表示:将两个CompletableFuture
组合起来,只要其中一个执行完了就会执行下个任务。
区别在于:
applyToEither
:将已经执行完成的任务结果作为方法入参,传递到指定方法中,有返回值
acceptEither
:将已经执行完成的任务结果作为方法入参,传递到指定方法中,无返回值
runAfterEither
:不会把执行结果当做方法入参,没有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static void acceptEither () throws Exception { Random random = new Random(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(200 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync1线程名称: " + Thread.currentThread().getName() + "。结果为:100" ); return 100 ; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(200 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync2线程名称: " + Thread.currentThread().getName() + "。结果为:200" ); return 200 ; }); CompletableFuture<Void> future = f1.acceptEither(f2, arg -> { System.out.println("acceptEither线程名称: " + Thread.currentThread().getName() + "。参数为:" + arg + "。无返回值" ); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
anyOf 1 public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs)
任意一个任务执行完,就执行anyOf
返回的CompletableFuture
。如果执行的任务异常,anyOf
的CompletableFuture
执行get()
方法会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static void anyOf () throws Exception { Random random = new Random(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(200 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync1线程名称: " + Thread.currentThread().getName() + "。结果为:100" ); return 100 ; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(100 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync2线程名称: " + Thread.currentThread().getName() + "。结果为:200" ); return 200 ; }); CompletableFuture<Object> future = CompletableFuture.anyOf(f1, f2).whenComplete((arg, throwable) -> { System.out.println("anyOf线程名称: " + Thread.currentThread().getName() + "。参数为:" + arg + "。无返回值" ); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
allOf 1 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs)
所有任务都执行完成后,才执行allOf
返回的CompletableFuture
。如果任意一个任务异常,allOf
的CompletableFuture
执行get()
方法都会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static void allOf () throws Exception { Random random = new Random(); CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(200 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync1线程名称: " + Thread.currentThread().getName() + "。结果为:100" ); return 100 ; }); CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(100 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync2线程名称: " + Thread.currentThread().getName() + "。结果为:200" ); return 200 ; }); CompletableFuture<Void> future = CompletableFuture.allOf(f1, f2).whenComplete((unused, throwable) -> { System.out.println("allOf线程名称: " + Thread.currentThread().getName() + "。参数为:" + unused + "。无返回值" ); }); System.out.println("阻塞获取结果。结果为:" + future.get()); }
thenCompose 1 public <U> CompletableFuture<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn)
在某个任务执行完成后,将该任务的执行结果作为方法入参去执行指定的方法。该方法会返回一个新的CompletableFuture
实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void thenCompose () throws Exception { Random random = new Random(); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(200 ), TimeUnit.MILLISECONDS); System.out.println("supplyAsync线程名称: " + Thread.currentThread().getName() + "。结果为:100" ); return 100 ; }).thenCompose(arg -> CompletableFuture.supplyAsync(() -> { Uninterruptibles.sleepUninterruptibly(random.nextInt(100 ), TimeUnit.MILLISECONDS); System.out.println("thenCompose线程名称: " + Thread.currentThread().getName() + "。接收参数为:" + arg + "。返回:200" ); return 200 ; })); System.out.println("阻塞获取结果。结果为:" + future.get()); }
注意事项 CompletableFuture.get方法是阻塞的 CompletableFuture
的get()
方法是阻塞的,如果使用它来获取异步调用的返回值,最好添加超时时间。
CompletableFuture.get():获取返回值抛出异常。
CompletableFuture.join():获取返回值不抛出异常。
线程池 CompletableFuture
默认使用ForkJoinPool.commonPool
线程池,核心数量为服务器CPU-1 。当有大量请求处理且任务耗时较久时就会响应很慢。建议使用自定义线程池,配置自定义线程池参数。
Future需要获取返回值才能获取异常信息 1 2 3 4 5 6 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return 100 / 0 ; }).thenAccept(System.out::println); future.get();
Future
需要获取返回值,才能获取到异常信息。如果不加 get()
/join()
方法,看不到异常信息。使用的时候需要考虑是否加try...catch...
或者使用exceptionally
方法。