深入并发包-线程池

仅供学习交流,如有错误请指出,如要转载请加上出处,谢谢

前言

线程是应用程序执行任务的最小单元,在jvm中允许应用程序在多个线程同时执行,竟可能利用服务器的性能来最大化提高应用程序的吞吐量和响应性,但是往往在生产环境中,如果我们为每一个任务分配一个线程,当创建足够多的时候,就会存在一些明显的缺陷:

  • 线程生命周期开销高:线程的创建和销毁是需要一定的时间,并且需要JVM和操作系统相互辅助操作,在应用程序创建足够多的线程时,这个消耗也是很可观的
  • 资源消耗:活跃的线程会消耗大量的内存,如果你的处理器数量少于你的处理线程的数量,这将会产生大量的闲置线程,这会占用大量的内存,以及在共享资源的竞争上会产生其他的性能开销,同时也给GC带来压力
  • 稳定性:每个平台对于线程都会有一定的限制,对于JVM来讲,在JVM的启动参数或者Thread构造函数请求栈的大小这都对线程的数量造成一定的限制,如果超过了这些限制,很有可能会造成难以恢复的OutOfMemoryError的异常,还有操作系统本身对线程的一些限制,都会造成应用程序的不稳定
    基于以上的一些缺陷,所以需要有一种机制来管理线程,这就出现了Executor框架

Executor框架

Executor框架是JDK1.5提出的,由JAVA大神Doug Lea编写,他很详细的描述了线程的生命周期,现在我们来看一看Executor框架的结构图

Executor

Executor是Executor框架最顶层的接口,也是最核心的功能,我们进入Executor接口的源码(JDK1.8)看一看

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

由上可知,Executor接口只有一个execute方法,他的功能从注释中可知:就是提交一个任务命令,然后在将来的某个时间段执行该(一组或者一个)任务命令,显而易见,这是个执行线程的方法,他将线程的提交过程和线程的执行过程解耦出来,这就相当于生产者-消费者模式,提交过程相当于生产者,执行过程相当于消费者,然后以一个异步的执行策略来执行任务(当然这并不是严格要求的),这提交和执行过程中要符合内存一致性效果,就是线程中将Runnable对象提交到Executor执行之前要遵循happen-before原则

ExecutorService

在上述的Executor中,只是对于线程的提交,执行的描述,并没有考虑到任务线程的关闭,在线程的整个生命周期中,关闭线程也是非常重要的一部分,如果不能正常的关闭,会导致应用程序一些意想不到的异常,所以ExecutorService的作用是扩展了Executor,完善了任务线程的生命周期的管理,以及跟踪一个或多个异步的任务线程执行状况而生成Future,主要的方法如下:

  • isShutdown() :判断当前应用程序是否关闭,如果已关闭返回true
  • shutdown():启动顺序关闭机制,执行此前提交的任务,不再接受新的任务
  • shutdownNow() :试图立刻执行关闭所有正在执行的任务,并且返回等待执行的任务列表
  • submit(Runnable task):submit是对 Executor接口中的execute方法的一个扩展,使用Future对异步任务线程的执行控制
  • isTerminated() :判断所有的任务是否都已经完成,完成就返回true
  • invokeAny(Collection<? extends Callable> tasks):执行给定的任务,如果某个任务完成则返回该结果
  • invokeAll(Collection<? extends Callable> tasks):执行给定的任务,当所有的任务完成之后,返回保持任务状态和结果的Futrue列表

AbstractExecutorService

AbstractExecutorService是ExecutorService的默认实现,主要是使用RunnableFuture(它是一个Runnable的Futrue,指可以执行Runnable并可以访问其结果)实现submit,invokeAny和invokeAll等方法,而生成RunnableFuture的核心方法就是newTaskFor,他的含义给执行的任务线程生成一个RunnableFuture

ScheduledExecutorService和ScheduledThreadPoolExecutor

ScheduledExecutorService是一种延迟执行的ExecutorService,ScheduledThreadPoolExecutor是其实现类,指安排指定的延迟时间来执行任务,主要方法schedule是指创建并执行在给定延迟时间启用的ScheduledFuture(它是一个scheduled的Future,指一种延迟并可结果化的操作)

ForkJoinPool

ForkJoinPool是对AbstractExecutorService的扩展,是基于分而治之的思想,将复杂的任务异步的分解成多个小任务去执行,该类也是ForkJoin框架核心的类,这里就不在赘述了

ThreadPoolExecutor

任务的状态

ThreadPoolExecutor顾名思义线程池,是Executor框架的主要实现方法,他实现了主要的任务线程执行和关闭过程,下面通过源码来分析一下(基于JDK1.8)

ThreadPoolExecutor定义了五种任务线程的状态,它记录了线程池中的线程的声明周期:

  1. RUNNING:可以接受新的任务,也可以处理阻塞队列里的任务
  2. SHUTDOWN:不接受新的任务,但是可以处理阻塞队列里的任务
  3. STOP:不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务
  4. TIDYING:过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,并且将要调用terminated方法
  5. TERMINATED:终止状态。terminated方法调用完成以后的状态

状态之间可以进行转换:

  1. RUNNING -> SHUTDOWN:手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
  2. (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法
  3. SHUTDOWN -> TIDYING:当队列和线程池都为空的时候
  4. STOP -> TIDYING:当线程池为空的时候
  5. TIDYING -> TERMINATED:terminated方法调用完成之后

源码定义如下:

1
2
3
4
5
6
7
8
9
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor处理某个状态拥有的活跃线程数量是用一个整形常量来表示,用前三位来表示状态,后29位来表示数量,我们先来看COUNT_BITS常量,通过计算可知它的值为29,在java中一个整形占用四个字节,一个字节是八位,所以一个整形占用32位,而COUNT_BITS表示的就是整形中的后29位,它的含义是来存储有效线程的线程数,那状态又怎么表示了,我们来看RUNNING状态:-1<<COUNT_BITS这个左移位运算(丢弃COUNT_BITS数量的最高位,0补最低位),通过-1<<29得到11100000000000000000000000000000,前3位为111,就是表示RUNNING的状态标志,其它四种状态也跟此表示方法一样来标志
上述代码中,常量ctl表示的是初始线程池的状态和数量,默认是RUNNING状态和0个活跃任务线程数量,而CAPACITY表示的是线程池的容量,通过(1<<29)-1运算获得,清楚了任务线程的状态和数量的含义,我们来看一下三个重要的对任务线程的状态和数量操作的内部静态方法

1
2
3
4
5
6
// 得到状态,CAPACITY的非操作得到的二进制位11100000000000000000000000000000,然后做在一个与操作,相当于直接取前3位的的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 得到线程数,也就是后29位的数字。 直接跟CAPACITY做一个与操作即可,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 与操作的话前面3位肯定为0,相当于直接取后29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 或操作。相当于更新数量和状态两个操作
private static int ctlOf(int rs, int wc) { return rs | wc; }

任务的初始化

ThreadPoolExecutor提供了初始化的构造方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

  • corePoolSize:线程池中的线程数,如果执行的线程数大于该数量,就进入阻塞队列进行等待
  • maximumPoolSize:线程池允许最大的线程数
  • keepAliveTime:执行的线程数大于该线程池中的线程数,多余的空闲线程在回收前等待新任务的时间
  • unit:时间单位
  • workQueue:保存执行任务的阻塞队列
  • threadFactory:创建线程的线程工厂
  • handler:执行被阻止时使用的处理程序,因为线程已达到线程限制和队列容量

但是JDK并不推荐直接用构造函数来进行线程池的初始化,直接初始化的灵活性和管理上并不好,于是提供了executors线程池工厂类,它应用与各种不同场景下对线程池的初始化需求,主要有如下方法:

  • newFixedThreadPool:创建一个固定数量级的线程池,如果活跃线程数超过线程池数量,它们将在等待队列中知道线程池中的线程可用
  • newSingleThreadExecutor:创建单个线程的线程池,如果该单线程由于故障等原因停止,想要执行后续的任务,则会创建新的线程去执行
  • newCachedThreadPool:创建一个可缓存的线程池,根据需要,如果有新的任务进来,线程池没有多余的线程可用,则在线程池中创建新的线程执行,有就复用已创建的线程,如果线程有60秒未被使用则会从缓存中回收
  • newSingleThreadScheduledExecutor:创建单个线程的线程池,以延迟或定时的方式来执行任务,如果该单线程由于故障等原因停止,想要执行后续的任务,则会创建新的线程去执行
  • newScheduledThreadPool:创建一个固定的线程池,以延迟或者定时的方式来执行任务

任务的执行

ThreadPoolExecutor实现了execute方法,具体代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

执行execute方法总共有三个步骤,也就是上述代码的三个判断

  1. if (workerCountOf(c) < corePoolSize) :如果当前的工作线程数量少于线程池的基本数量,则直接创建新的工作线程执行,调用addWorker方法
  2. if (isRunning(c) && workQueue.offer(command)):如果当前的工作线程数量大于等于线程池的基本数量,且是RUNNING的状态,就加入等待阻塞队列,如果成功的话,再进行第二次验证,如果在阻塞队列中由于另一个线程关闭线程池或者线程出现死亡了,这个时候线程并不在RUNNING状态,就把刚加入的线程remove掉,成功的话就调用reject方法,否则判断工程线程是否为0,是就调用addWorker()加入一个新线程
  3. 如果放进阻塞等待队列失败的话,那我们尝试添加一个新的线程执行,如果失败的话,则应该是线程池饱和或者关闭了,调用reject方法

注意:在addWorker方法中,第二boolean参数的true代表以corePoolSize为标准,false以maximumPoolSize为基准

execute方法的核心就是在线程池中如何启动一个线程,也就是addWorker方法,在深入addWorker方法之前先要了解线程在线程池的表现的基本单元载体类Worker,Worker是一个内部类,我们来看看他的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 使用ThreadFactory构造Thread,这个构造的Thread内部的Runnable就是本身,也就是Worker。所以得到Worker的thread并start的时候,会执行Worker的run方法,也就是执行ThreadPoolExecutor的runWorker方法
//把状态位设置成-1,这样任何线程都不能得到Worker的锁,除非调用了unlock方法。这个unlock方法会在runWorker方法中一开始就调用,这是为了确保Worker构造出来之后,没有任何线程能够得到它的锁,除非调用了runWorker之后,其他线程才能获得Worker的锁
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

worker是线程池的执行单元,实现了AQS同步锁和runnable接口,接下来看一下addWorker源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 两个参数,firstTask表示需要跑的任务。boolean类型的core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
// 返回值是boolean类型,true表示新任务被接收了,并且执行了。否则是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 线程池当前状态
// 这个判断转换成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。
// 概括为3个条件:
// 1. 线程池不在RUNNING状态并且状态是STOP、TIDYING或TERMINATED中的任意一种状态
// 2. 线程池不在RUNNING状态,线程池接受了新的任务
// 3. 线程池不在RUNNING状态,阻塞队列为空。 满足这3个条件中的任意一个的话,拒绝执行任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 线程池线程个数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果线程池线程数量超过线程池最大容量或者线程数量超过了基本大小(core参数为true,core参数为false的话判断超过最大大小)
return false; // 超过直接返回false
if (compareAndIncrementWorkerCount(c)) // 没有超过各种大小的话,cas操作线程池线程数量+1,cas成功的话跳出循环
break retry;
c = ctl.get(); // 重新检查状态
if (runStateOf(c) != rs) // 如果状态改变了,重新循环操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到这一步说明cas操作成功了,线程池线程数量+1
boolean workerStarted = false; // 任务是否成功启动标识
boolean workerAdded = false; // 任务是否添加成功标识
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; // 得到线程池的可重入锁
w = new Worker(firstTask); // 基于任务firstTask构造worker
final Thread t = w.thread; // 使用Worker的属性thread,这个thread是使用ThreadFactory构造出来的
if (t != null) { // ThreadFactory构造出的Thread有可能是null,做个判断
mainLock.lock(); // 锁住,防止并发
try {
// 在锁住之后再重新检测一下状态
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果线程池在RUNNING状态或者线程池在SHUTDOWN状态并且任务是个null
if (t.isAlive()) // 判断线程是否还活着,也就是说线程已经启动并且还没死掉
throw new IllegalThreadStateException(); // 如果存在已经启动并且还没死的线程,抛出异常
workers.add(w); // worker添加到线程池的workers属性中,是个HashSet
int s = workers.size(); // 得到目前线程池中的线程个数
if (s > largestPoolSize) // 如果线程池中的线程个数超过了线程池中的最大线程数时,更新一下这个最大线程数
largestPoolSize = s;
workerAdded = true; // 标识一下任务已经添加成功
}
} finally {
mainLock.unlock(); // 解锁
}
if (workerAdded) { // 如果任务添加成功,运行任务,改变一下任务成功启动标识
t.start(); // 启动线程,这里的t是Worker中的thread属性,所以相当于就是调用了Worker的run方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果任务启动失败,调用addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}

在上述的代码中,如果任务添加成功,就会执行t.start()方法,也就是执行worker中的run方法,而worker中的run方法调用了runWorker(this)方法,接下来看一下runWorker方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 得到当前线程
Runnable task = w.firstTask; // 得到Worker中的任务task,也就是用户传入的task
w.firstTask = null; // 将Worker中的任务置空
w.unlock(); // allow interrupts。
boolean completedAbruptly = true;
try {
// 如果worker中的任务不为空,继续知否,否则使用getTask获得任务。一直死循环,除非得到的任务为空才退出
while (task != null || (task = getTask()) != null) {
w.lock(); // 如果拿到了任务,给自己上锁,表示当前Worker已经要开始执行任务了,已经不是闲置Worker(闲置Worker的解释请看下面的线程池关闭)
// 在执行任务之前先做一些处理。 1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程 2. 如果线程池还处于RUNNING或SHUTDOWN状态,并且当前线程已经被中断了,重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 任务执行前需要做什么,ThreadPoolExecutor是个空实现
Throwable thrown = null;
try {
task.run(); // 真正的开始执行任务,调用的是run方法,而不是start方法。这里run的时候可能会被中断,比如线程池调用了shutdownNow方法
} catch (RuntimeException x) { // 任务执行发生的异常全部抛出,不在runWorker中处理
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任务执行结束需要做什么,ThreadPoolExecutor是个空实现
}
} finally {
task = null;
w.completedTasks++; // 记录执行任务的个数
w.unlock(); // 执行完任务之后,解锁,Worker变成闲置Worker
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 回收Worker方法
}
}

在runWorker方法中,如果任务为空,while表达式有个轮询的方式去获取任务,如果task为null的话,会去获取新的任务,接下来看一下获取任务的方法getTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
boolean timedOut = false; // 如果使用超时时间并且也没有拿到任务的标识
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池是SHUTDOWN状态并且阻塞队列为空的话,worker数量减一,直接返回null(SHUTDOWN状态还会处理阻塞队列任务,但是阻塞队列为空的话就结束了),如果线程池是STOP状态的话,worker数量建议,直接返回null(STOP状态不处理阻塞队列任务)[方法一开始注释的2,3两点,返回null,开始Worker回收]
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // 标记从队列中取任务时是否设置超时时间,如果为true说明这个worker可能需要回收,为false的话这个worker会一直存在,并且阻塞当前线程等待阻塞队列中有数据
for (;;) {
int wc = workerCountOf(c); // 得到当前线程池Worker个数
// allowCoreThreadTimeOut属性默认为false,表示线程池中的核心线程在闲置状态下还保留在池中;如果是true表示核心线程使用keepAliveTime这个参数来作为超时时间
// 如果worker数量比基本大小要大的话,timed就为true,需要进行回收worker
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed)) // 方法一开始注释的1,4两点,会进行下一步worker数量减一
break;
if (compareAndDecrementWorkerCount(c)) // worker数量减一,返回null,之后会进行Worker回收工作
return null;
c = ctl.get(); // 重新检查线程池状态
if (runStateOf(c) != rs) // 线程池状态改变的话重新开始外部循环,否则继续内部循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果需要设置超时时间,使用poll方法,否则使用take方法一直阻塞等待阻塞队列新进数据
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; // 闲置Worker被中断
}
}
}

任务线程会从阻塞队列中获取,如果发生如下情况,那么worker需要被回收:

  1. Worker个数比线程池最大大小要大
  2. 线程池处于STOP状态
  3. 线程池处于SHUTDOWN状态并且阻塞队列为空
  4. 使用超时时间从阻塞队列里拿数据,并且超时之后没有拿到数据(allowCoreThreadTimeOut || workerCount > corePoolSize)

回收会在runWorker方法中发生如上事件抛出异常,在finally块中调用processWorkerExit方法进行回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果Worker没有正常结束流程调用processWorkerExit方法,worker数量减一。如果是正常结束的话,在getTask方法里worker数量已经减一了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁,防止并发问题
try {
completedTaskCount += w.completedTasks; // 记录总的完成任务数
workers.remove(w); // 线程池的worker集合删除掉需要回收的Worker
} finally {
mainLock.unlock(); // 解锁
}
tryTerminate(); // 尝试结束线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果线程池还处于RUNNING或者SHUTDOWN状态
if (!completedAbruptly) { // Worker是正常结束流程的话
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 不需要新开一个Worker
}
// 新开一个Worker代替原先的Worker
// 新开一个Worker需要满足以下3个条件中的任意一个:
// 1. 用户执行的任务发生了异常
// 2. Worker数量比线程池基本大小要小
// 3. 阻塞队列不空但是没有任何Worker在工作
addWorker(null, false);
}
}

在回收Worker的时候线程池会尝试结束自己的运行,tryTerminate方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 满足3个条件中的任意一个,不终止线程池
// 1. 线程池还在运行,不能终止
// 2. 线程池处于TIDYING或TERMINATED状态,说明已经在关闭了,不允许继续处理
// 3. 线程池处于SHUTDOWN状态并且阻塞队列不为空,这时候还需要处理阻塞队列的任务,不能终止线程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 走到这一步说明线程池已经不在运行,阻塞队列已经没有任务,但是还要回收正在工作的Worker
if (workerCountOf(c) != 0) {
// 由于线程池不运行了,调用了线程池的关闭方法,在解释线程池的关闭原理的时候会说道这个方法
interruptIdleWorkers(ONLY_ONE); // 中断闲置Worker,直到回收全部的Worker。这里没有那么暴力,只中断一个,中断之后退出方法,中断了Worker之后,Worker会回收,然后还是会调用tryTerminate方法,如果还有闲置线程,那么继续中断
return;
}
// 走到这里说明worker已经全部回收了,并且线程池已经不在运行,阻塞队列已经没有任务。可以准备结束线程池了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁,防止并发
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,将线程池状态改成TIDYING
try {
terminated(); // 调用terminated方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // terminated方法调用完毕之后,状态变为TERMINATED
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock(); // 解锁
}
// else retry on failed CAS
}
}

ThreadPoolExecutor的关闭

线程池的关闭有两个重要的方法:shutdown和shutdownNow,下面看一下shutdown方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 关闭的时候需要加锁,防止并发
try {
checkShutdownAccess(); // 检查关闭线程池的权限
advanceRunState(SHUTDOWN); // 把线程池状态更新到SHUTDOWN
interruptIdleWorkers(); // 中断闲置的Worker
onShutdown(); // 钩子方法,默认不处理。ScheduledThreadPoolExecutor会做一些处理
} finally {
mainLock.unlock(); // 解锁
}
tryTerminate(); // 尝试结束线程池,上面已经分析过了
}

再看看中断线程的interruptIdleWorkers方法方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 调用他的一个重载方法,传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 中断闲置Worker需要加锁,防止并发
try {
for (Worker w : workers) {
Thread t = w.thread; // 拿到worker中的线程
if (!t.isInterrupted() && w.tryLock()) { // Worker中的线程没有被打断并且Worker可以获取锁,这里Worker能获取锁说明Worker是个闲置Worker,在阻塞队列里拿数据一直被阻塞,没有数据进来。如果没有获取到Worker锁,说明Worker还在执行任务,不进行中断(shutdown方法不会中断正在执行的任务)
try {
t.interrupt(); // 中断Worker线程
} catch (SecurityException ignore) {
} finally {
w.unlock(); // 释放Worker锁
}
}
if (onlyOne) // 如果只打断1个Worker的话,直接break退出,否则,遍历所有的Worker
break;
}
} finally {
mainLock.unlock(); // 解锁
}
}

轮询所有的worker,判断是不是中断的线程且可以拿到锁(判断闲置的worker),就中断该线程,现在来看一看另一个关闭的shutdownNow方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// shutdownNow方法会有返回值的,返回的是一个任务列表,而shutdown方法没有返回值
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // shutdownNow操作也需要加锁,防止并发
try {
checkShutdownAccess(); // 检查关闭线程池的权限
advanceRunState(STOP); // 把线程池状态更新到STOP
interruptWorkers(); // 中断Worker的运行
tasks = drainQueue();
} finally {
mainLock.unlock(); // 解锁
}
tryTerminate(); // 尝试结束线程池,上面已经分析过了
return tasks;
}

shutdownNow方法的语义是立刻关闭所有的worker,而不是shutdown方法关闭闲置的方法,而且shutdownNow直接将状态改为STOP,这样的话,不会执行新的任务,同时回收所有的worker,而shutdown只是将状态变为SHUTDOWN,接下来看一看shutdownNow的中断worker方法interruptWorkers():

1
2
3
4
5
6
7
8
9
10
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 中断Worker需要加锁,防止并发
try {
for (Worker w : workers)
w.interruptIfStarted(); // 中断Worker的执行
} finally {
mainLock.unlock(); // 解锁
}
}

Worker的interruptIfStarted方法中断Worker的执行:

1
2
3
4
5
6
7
8
9
10
void interruptIfStarted() {
Thread t;
// Worker无论是否被持有锁,只要还没被中断,那就中断Worker
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt(); // 强行中断Worker的执行
} catch (SecurityException ignore) {
}
}
}

总结

在不同的应用场景下,线程池的生态也越来越大,它的意义是让用户更加高效的应用线程去执行不同的任务,为线程提供完整的生命周期,虽然线程池提供了很好的线程复用机制,但是并不是就可以滥用线程池来创建去执行任务,在操作系统或者JVM平台下都有对于线程一定的限制,合理的使用才能更加的高效!!!

参考

http://www.jianshu.com/p/758a99c83ef1
http://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/
《java并发编程实战》第六章