使用线程池这么久,自认为很熟悉,居然还能踩到坑!问题原因:线程池使用 FutureTask的时候如果拒绝策略设置为了 DiscardPolicy或DiscardOldestPolicy并且在被拒绝的任务 Future对象上调用无参 get方法那么调用线程会一直被阻塞。
问题复现 先来看一段代码进行复现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class FutureTest { private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1 , 1 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 ), new NamedThreadFactory("executor" ), new ThreadPoolExecutor.DiscardPolicy()); public static void main (String[] args) throws ExecutionException, InterruptedException { Future<?> first = EXECUTOR.submit(() -> { System.out.println("first task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); Future<?> second = EXECUTOR.submit(() -> { System.out.println("second task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); Future<?> third = null ; try { third = EXECUTOR.submit(() -> { System.out.println("first task start" ); Uninterruptibles.sleepUninterruptibly(3 , TimeUnit.SECONDS); }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("first task output. " + first.get()); System.out.println("second task output. " + second.get()); System.out.println("third task output. " + (third == null ? null : third.get())); EXECUTOR.shutdown(); } }
执行结果为下图所示,third task 一直没有执行,且主线程阻塞在third.get()
1 2 3 4 first task start first task output. null second task start second task output. null
流程分析
创建一个核心线程数为1且队列大小为1的线程池,设置拒绝策略为DiscardPolicy。
向线程池提交first任务,线程池会使用核心线程池执行该任务,任务将会阻塞3s。
向线程池提交second任务,线程池将任务放到队列中。
向线程池提交third任务,线程池已满,线程池采用DiscardPolicy丢弃任务。
等待first任务执行完毕后主线程打印 first task output. null
first任务执行完成关闭后,线程池从队列中取出second任务执行,主线程打印second task output. null
third任务会一直阻塞,程序不会结束。如果把拒绝策略修改为DiscardOldestPolicy,也会出现同样的问题。
将拒绝策略修改为AbortPolicy后输出结果如下,线程池正常关闭。
1 2 3 4 5 6 first task start Task java.util.concurrent.FutureTask@32c4e8b2 rejected from java.util.concurrent.ThreadPoolExecutor@64bce832[Running, pool size = 1 , active threads = 1 , queued tasks = 1 , completed tasks = 0 ] first task output. null second task start second task output. null third task output. null
调用逻辑梳理 当提交任务到线程池中时
1 2 3 4 5 6 7 8 9 10 11 12 13 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
上述是任务不断加入线程池的流程处理,其中需要研究的就是最后一步拒绝策略对任务的影响。
1 2 3 4 5 6 7 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
当拒绝策略使用DiscardPolicy时发现什么都没做。但是当把 Runnable包装为Future对象时,Future是有状态的,Future中的状态如下:
1 2 3 4 5 6 7 private static final int NEW = 0 ;private static final int COMPLETING = 1 ;private static final int NORMAL = 2 ;private static final int EXCEPTIONAL = 3 ;private static final int CANCELLED = 4 ;private static final int INTERRUPTING = 5 ;private static final int INTERRUPTED = 6 ;
在把Runnable包装为Future对象的时候使用newTaskFor方法转换Runnable为FutureTask,而FutureTask的构造函数里面设置的线程状态就是New。所以使用DiscardPolicy策略提交后返回了一个状态为NEW的Future对象。
1 2 3 4 public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
当调用Future的无参get方法时逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); } private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
也就是说当Future的状态 >COMPLETING时候调用get方法才会正常返回,而DiscardPolicy策略在拒绝任务的时候并没有设置future的状态,后面也没有其他机会可以设置该future的状态,所以future的状态一直是NEW,导致任务阻塞,一直不会返回。同理DiscardOldestPolicy策略也是这样的问题,最老的任务被淘汰时没有设置淘汰任务future的状态。
默认的AbortPolicy策略当任务超出后会直接会抛出RejectedExecutionException异常,也就是submit方法并没有返回future对象,这时候third是null,可以正常返回。
结论 当使用Future的时候,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于任务一直等待,超时时间到了会自动返回,如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时候设置该Future的状态大于COMPLETING即可,但是查看FutureTask提供的方法发现只有cancel方法是public的并且可以设置FutureTask的状态大于COMPLETING,重写拒绝策略的线程池具体代码如下:
1 2 3 4 5 6 private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1 , 1 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 ), new NamedThreadFactory("executor" ), (r, executor) -> { if (r instanceof FutureTask) { ((FutureTask<?>) r).cancel(true ); } });
使用这个策略后,Future.isCancelled方法可判断线程是否已经取消,所以可以将代码修改为:
1 2 System.out.println("third task output. " + (third.isCancelled() ? "拒绝了" : third.get()));