使用线程池这么久,自认为很熟悉,居然还能踩到坑!问题原因:线程池使用 FutureTask的时候如果拒绝策略设置为了 DiscardPolicy或DiscardOldestPolicy并且在被拒绝的任务 Future对象上调用无参 get方法那么调用线程会一直被阻塞。

问题复现

先来看一段代码进行复现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class FutureTest {

/**
* 创建核心数量为1,队列个数为1的线程池。拒绝策略为DiscardPolicy。
*/
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1), new NamedThreadFactory("executor"), new ThreadPoolExecutor.DiscardPolicy());

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 添加任务一
Future<?> first = EXECUTOR.submit(() -> {
System.out.println("first task start");
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
});

// 添加任务二
Future<?> second = EXECUTOR.submit(() -> {
System.out.println("second task start");
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
});

// 添加任务、三
Future<?> third = null;
try {
third = EXECUTOR.submit(() -> {
System.out.println("first task start");
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
});
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
}

// 等待任务一执行完毕
System.out.println("first task output. " + first.get());
// 等待任务二执行完毕
System.out.println("second task output. " + second.get());
// 等待任务三执行完毕
System.out.println("third task output. " + (third == null ? null : third.get()));

// 关闭线程池
EXECUTOR.shutdown();
}
}

执行结果为下图所示,third task 一直没有执行,且主线程阻塞在third.get()

1
2
3
4
first task start
first task output. null
second task start
second task output. null

流程分析

  1. 创建一个核心线程数为1且队列大小为1的线程池,设置拒绝策略为DiscardPolicy
  2. 向线程池提交first任务,线程池会使用核心线程池执行该任务,任务将会阻塞3s。
  3. 向线程池提交second任务,线程池将任务放到队列中。
  4. 向线程池提交third任务,线程池已满,线程池采用DiscardPolicy丢弃任务。
  5. 等待first任务执行完毕后主线程打印 first task output. null
  6. first任务执行完成关闭后,线程池从队列中取出second任务执行,主线程打印second task output. null
  7. third任务会一直阻塞,程序不会结束。如果把拒绝策略修改为DiscardOldestPolicy,也会出现同样的问题。

将拒绝策略修改为AbortPolicy后输出结果如下,线程池正常关闭。

1
2
3
4
5
6
first task start
Task java.util.concurrent.FutureTask@32c4e8b2 rejected from java.util.concurrent.ThreadPoolExecutor@64bce832[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
first task output. null
second task start
second task output. null
third task output. null

调用逻辑梳理

当提交任务到线程池中时

1
2
3
4
5
6
7
8
9
10
11
12
13
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 包装Runnable为Future对象
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
// 返回Future对象
return ftask;
}

// 包装Runnable为Future对象
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 任务数量小于核心线程池数量时新增线程处理
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 任务数量达到核心线程数量时,将任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试新增线程处理任务
else if (!addWorker(command, false))
// 新增失败,调用拒绝策略
reject(command);
}

上述是任务不断加入线程池的流程处理,其中需要研究的就是最后一步拒绝策略对任务的影响。

1
2
3
4
5
6
7
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }
// 什么都没做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

当拒绝策略使用DiscardPolicy时发现什么都没做。但是当把 Runnable包装为Future对象时,Future是有状态的,Future中的状态如下:

1
2
3
4
5
6
7
private static final int NEW          = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

在把Runnable包装为Future对象的时候使用newTaskFor方法转换RunnableFutureTask,而FutureTask的构造函数里面设置的线程状态就是New。所以使用DiscardPolicy策略提交后返回了一个状态为NEWFuture对象。

1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

当调用Future的无参get方法时逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 当状态值 <= COMPLETING时需要等待,否者调用report返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
// 状态值为NORMAL正常返回
if (s == NORMAL)
return (V)x;
// 状态值大于等于CANCELLED则抛异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

也就是说当Future的状态 >COMPLETING时候调用get方法才会正常返回,而DiscardPolicy策略在拒绝任务的时候并没有设置future的状态,后面也没有其他机会可以设置该future的状态,所以future的状态一直是NEW,导致任务阻塞,一直不会返回。同理DiscardOldestPolicy策略也是这样的问题,最老的任务被淘汰时没有设置淘汰任务future的状态。

默认的AbortPolicy策略当任务超出后会直接会抛出RejectedExecutionException异常,也就是submit方法并没有返回future对象,这时候thirdnull,可以正常返回。

结论

当使用Future的时候,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于任务一直等待,超时时间到了会自动返回,如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时候设置该Future的状态大于COMPLETING即可,但是查看FutureTask提供的方法发现只有cancel方法是public的并且可以设置FutureTask的状态大于COMPLETING,重写拒绝策略的线程池具体代码如下:

1
2
3
4
5
6
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1), new NamedThreadFactory("executor"), (r, executor) -> {
if (r instanceof FutureTask) {
((FutureTask<?>) r).cancel(true);
}
});

使用这个策略后,Future.isCancelled方法可判断线程是否已经取消,所以可以将代码修改为:

1
2
// 等待任务三执行完毕
System.out.println("third task output. " + (third.isCancelled() ? "拒绝了" : third.get()));