使用线程池这么久,自认为很熟悉,居然还能踩到坑!问题原因:线程池使用 FutureTask
的时候如果拒绝策略设置为了 DiscardPolicy或DiscardOldestPolicy
并且在被拒绝的任务 Future
对象上调用无参 get
方法那么调用线程会一直被阻塞。
问题复现 先来看一段代码进行复现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class FutureTest { private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1 , 1 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 ), new NamedThreadFactory("executor" ), new ThreadPoolExecutor.DiscardPolicy()); public static void main (String[] args) throws ExecutionException, InterruptedException { Future<?> first = EXECUTOR.submit(() -> { System.out.println("first task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); Future<?> second = EXECUTOR.submit(() -> { System.out.println("second task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); Future<?> third = null ; try { third = EXECUTOR.submit(() -> { System.out.println("first task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("first task output. " + first.get()); System.out.println("second task output. " + second.get()); System.out.println("third task output. " + (third == null ? null : third.get())); EXECUTOR.shutdown(); } }
执行结果为下图所示,third task
一直没有执行,且主线程阻塞在third.get()
1 2 3 4 first task start first task output. null second task start second task output. null
流程分析
创建一个核心线程数为1且队列大小为1的线程池,设置拒绝策略为DiscardPolicy
。
向线程池提交first任务,线程池会使用核心线程池执行该任务,任务将会阻塞3s。
向线程池提交second任务,线程池将任务放到队列中。
向线程池提交third任务,线程池已满,线程池采用DiscardPolicy
丢弃任务。
等待first任务执行完毕后主线程打印 first task output. null
first任务执行完成关闭后,线程池从队列中取出second任务执行,主线程打印second task output. null
third任务会一直阻塞,程序不会结束。如果把拒绝策略修改为DiscardOldestPolicy
,也会出现同样的问题。
将拒绝策略修改为AbortPolicy
后输出结果如下,线程池正常关闭。
1 2 3 4 5 6 first task start Task java.util.concurrent.FutureTask@32c4e8b2 rejected from java.util.concurrent.ThreadPoolExecutor@64bce832[Running, pool size = 1 , active threads = 1 , queued tasks = 1 , completed tasks = 0 ] first task output. null second task start second task output. null third task output. null
调用逻辑梳理 当提交任务到线程池中时
1 2 3 4 5 6 7 8 9 10 11 12 13 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
上述是任务不断加入线程池的流程处理,其中需要研究的就是最后一步拒绝策略对任务的影响。
1 2 3 4 5 6 7 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
当拒绝策略使用DiscardPolicy
时发现什么都没做。但是当把 Runnable
包装为Future
对象时,Future
是有状态的,Future
中的状态如下:
1 2 3 4 5 6 7 private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
在把Runnable
包装为Future
对象的时候使用newTaskFor
方法转换Runnable
为FutureTask
,而FutureTask
的构造函数里面设置的线程状态就是New
。所以使用DiscardPolicy
策略提交后返回了一个状态为NEW
的Future
对象。
1 2 3 4 public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
当调用Future
的无参get
方法时逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); } private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
也就是说当Future
的状态 >COMPLETING
时候调用get
方法才会正常返回,而DiscardPolicy
策略在拒绝任务的时候并没有设置future
的状态,后面也没有其他机会可以设置该future
的状态,所以future
的状态一直是NEW
,导致任务阻塞,一直不会返回。同理DiscardOldestPolicy
策略也是这样的问题,最老的任务被淘汰时没有设置淘汰任务future
的状态。
默认的AbortPolicy
策略当任务超出后会直接会抛出RejectedExecutionException
异常,也就是submit
方法并没有返回future
对象,这时候third
是null
,可以正常返回。
结论 当使用Future
的时候,尽量使用带超时时间的get
方法,这样即使使用了DiscardPolicy
拒绝策略也不至于任务一直等待,超时时间到了会自动返回,如果非要使用不带参数的get
方法则可以重写DiscardPolicy
的拒绝策略,在执行策略时候设置该Future
的状态大于COMPLETING
即可,但是查看FutureTask
提供的方法发现只有cancel
方法是public
的并且可以设置FutureTask
的状态大于COMPLETING
,重写拒绝策略的线程池具体代码如下:
1 2 3 4 5 6 private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1 , 1 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 ), new NamedThreadFactory("executor" ), (r, executor) -> { if (r instanceof FutureTask) { ((FutureTask<?>) r).cancel(true ); } });
使用这个策略后,Future.isCancelled
方法可判断线程是否已经取消,所以可以将代码修改为:
1 2 System.out.println("third task output. " + (third.isCancelled() ? "拒绝了" : third.get()));