ExecutorCompletionService 是JDK1. 6中新增的异步类,可获取异步执行的结果。有着相同功能的ExcutorService
中Future.get
方法是阻塞的直到返回结果,也就是顺序执行get
方法,即使后续任务先执行完成也会阻塞在前面的任务的get
方法。而ExecutorCompletionService
执行结果无序且线程池中先执行完成的任务会先执行后续的逻辑,不会发生阻塞。
异步任务获取结果方式 多线程异步任务获取结果最常见的方式莫过于重写Callable
接口,然后通过future.get()
获取结果。但这种方法弊端很明显,get
方法会产生阻塞,导致任务耗时增加。当前有三种方法可以实现异步任务获取结果:
重写Callable
,通过future.get
获取结果。
CompletableFuture
异步通过join
方法获取结果。
通过ExecutorCompletionService
获取结果。
Callable 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 private final static ExecutorService EXECUTOR = Executors.newFixedThreadPool(3 );public static void main (String[] args) { method1(); } @SneakyThrows private static void method1 () { List<Task> tasks = getTasks(); long start = System.currentTimeMillis(); for (Future<Integer> future : EXECUTOR.invokeAll(tasks)) { Integer result = future.get(); Uninterruptibles.sleepUninterruptibly(1 , TimeUnit.SECONDS); System.out.println("任务返回结果:" + result); } System.out.println("共耗时:" + (System.currentTimeMillis() - start)); EXECUTOR.shutdown(); } @SneakyThrows private static void method2 () { List<Task> tasks = getTasks(); long start = System.currentTimeMillis(); List<Future<Integer>> futures = new ArrayList<>(); for (Task task : tasks) { futures.add(EXECUTOR.submit(task)); } for (Future<Integer> future : futures) { Integer result = future.get(); Uninterruptibles.sleepUninterruptibly(1 , TimeUnit.SECONDS); System.out.println("任务返回结果:" + result); } System.out.println("共耗时:" + (System.currentTimeMillis() - start)); EXECUTOR.shutdown(); } public static List<Task> getTasks () { List<Task> tasks = new ArrayList<>(); tasks.add(new Task(5 )); tasks.add(new Task(3 )); tasks.add(new Task(1 )); return tasks; } static class Task implements Callable <Integer > { public int time; public Task (int time) { this .time = time; } @Override public Integer call () { Uninterruptibles.sleepUninterruptibly(time, TimeUnit.SECONDS); return time; } }
method1
和method2
执行结果相同,输出结果如下:
1 2 3 4 任务返回结果:5 任务返回结果:3 任务返回结果:1 共耗时:8039
为什么说future.get()
阻塞获取结果,可以通过下图看出,只有等任务1 get
到任务结果并执行完成后续所有业务逻辑后才会轮到下一个任务执行后续逻辑,且get
方法按照提交顺序获取结果。总结为:先添加先处理 。
CompletableFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private final static ExecutorService EXECUTOR = Executors.newFixedThreadPool(3 );public static void main (String[] args) { completableFuture(); } private static void completableFuture () { List<Task> tasks = getTasks(); long start = System.currentTimeMillis(); tasks.parallelStream().map(task -> CompletableFuture.supplyAsync(task::call, EXECUTOR)) .collect(Collectors.toList()) .parallelStream() .map(CompletableFuture::join) .forEach(result -> { Uninterruptibles.sleepUninterruptibly(1 , TimeUnit.SECONDS); System.out.println("任务返回结果:" + result); }); System.out.println("共耗时:" + (System.currentTimeMillis() - start)); EXECUTOR.shutdown(); } public static List<Task> getTasks () { List<Task> tasks = new ArrayList<>(); tasks.add(new Task(5 )); tasks.add(new Task(3 )); tasks.add(new Task(1 )); return tasks; } static class Task implements Callable <Integer > { public int time; public Task (int time) { this .time = time; } @Override public Integer call () { Uninterruptibles.sleepUninterruptibly(time, TimeUnit.SECONDS); return time; } }
输出结果如下:
1 2 3 4 任务返回结果:1 任务返回结果:3 任务返回结果:5 共耗时:6078
CompletableFuture
把Task.call
作为普通方法调用执行,将外层包装为CompletableFuture.supplyAsync
获取结果。
ExecutorCompletionService 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 private final static ExecutorService EXECUTOR = Executors.newFixedThreadPool(3 );public static void main (String[] args) { executorCompletionService(); } @SneakyThrows private static void executorCompletionService () { List<Task> tasks = getTasks(); long start = System.currentTimeMillis(); ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(EXECUTOR); for (Task task : tasks) { completionService.submit(task); } for (Task task : tasks) { Integer time = completionService.take().get(); Uninterruptibles.sleepUninterruptibly(1 , TimeUnit.SECONDS); System.out.println("任务返回结果:" + time); } System.out.println("共耗时:" + (System.currentTimeMillis() - start)); EXECUTOR.shutdown(); } public static List<Task> getTasks () { List<Task> tasks = new ArrayList<>(); tasks.add(new Task(5 )); tasks.add(new Task(3 )); tasks.add(new Task(1 )); return tasks; } static class Task implements Callable <Integer > { public int time; public Task (int time) { this .time = time; } @Override public Integer call () { Uninterruptibles.sleepUninterruptibly(time, TimeUnit.SECONDS); return time; } }
输出结果如下
1 2 3 4 任务返回结果:1 任务返回结果:3 任务返回结果:5 共耗时:6030
可以看到ExecutorCompletionService
比Callable
在性能有一定提升。ExecutorCompletionService
先执行完成线程会继续执行后续业务逻辑,并不会产生阻塞。总结为:谁快谁优先 。
小结 获取异步线程执行结果性能排行
ExecutorCompletionService
CompletableFuture
Callable
解析ExecutorCompletionService 方法解析 ExecutorCompletionService
实现了CompletionService
接口,且CompletionService
只有ExecutorCompletionService
一个实现类,CompletionService
中只有5个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public interface CompletionService <V > { Future<V> submit (Callable<V> task) ; Future<V> submit (Runnable task, V result) ; Future<V> take () throws InterruptedException ; Future<V> poll () ; Future<V> poll (long timeout, TimeUnit unit) throws InterruptedException ; }
源码解析 三个私有属性
两个构造方法 可通过ExecutorCompletionService
的构造方法指定已完成队列的类型,默认为LinkedBlockingQueue
。
任务提交 QueueingFuture
继承了FutureTask
,FutureTask
重写了Runnable
的run()
方法,无论是set()
正常结果,还是setException()
结果,都会调用 finishCompletion()
方法。
任务执行流程
这里执行的done
方法,实际执行的是QueueingFuture
的done
方法。至此,当一个任务执行完成或异常的时候,都会被添加到已完成阻塞队列中,进而被取出处理。
获取结果 FutureTask
的任务完成后执行QueueingFuture.done
将已完成的结果存储到队列中,可通过take
、poll
方法直接从已完成队列中获取结果。
使用场景
多线程执行有返回值的任务。
同类服务调用,优先获取先返回任务的结果(如调用不同厂商的定位服务,使用耗时最短、最先返回的结果)。
获取任务集合的第一个结果后取消其他任务(如多中心文件下载,下载完成后终止其他下载线程)。
ExecutorCompletionService
doc中也给出了两个例子:
假设您有一组针对某个问题的求解器,每个求解器都返回某种Result类型的值,并希望同时运行它们,处理它们中每个返回非空值的结果,在某些方法中use(Result r) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void solve (Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); for (Callable<Result> s : solvers) ecs.submit(s); int n = solvers.size(); for (int i = 0 ; i < n; ++i) { Result r = ecs.take().get(); if (r != null ) use(r); } }
假设您想使用任务集的第一个非空结果,忽略任何遇到异常的结果,并在第一个任务准备好时取消所有其他任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void solve (Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null ; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0 ; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null ) { result = r; break ; } } catch (ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) f.cancel(true ); } if (result != null ) use(result); }