深入分析JAVA线程池的实现原理
一、前言
线程是属于稀缺资源,在应用中,如果频繁的创建销毁线程,势必会造成大量的CPU资源浪费,因此,合理的使用线程池不仅能加快任务处理速度还可以提高资源利用率,本篇主要基于JDK1.8来深入分析JAVA线程池的核心实现原理,由于能力有限,难免会有遗漏或错误的地方,欢迎读者指正。
二、Exectors
在JAVA里面,主要是通过Exectors框架来提供了多种线程池的实现方式,主要包括下面几种:
newFixedThreadPool:表示会创建一个线程大小固定的线程池,来看demo
public static void main(String[] args) {
Executor excutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
excutor.execute(new Task(i));
}
}
static class Task implements Runnable {
private int index;
Task(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println("Thread:" + Thread.currentThread().getName() + "执行任务" + index);
}
}
可能的运行效果:
Thread:pool-1-thread-2执行任务1
Thread:pool-1-thread-4执行任务3
Thread:pool-1-thread-3执行任务2
Thread:pool-1-thread-1执行任务0
Thread:pool-1-thread-6执行任务5
Thread:pool-1-thread-7执行任务6
Thread:pool-1-thread-5执行任务4
Thread:pool-1-thread-5执行任务10
Thread:pool-1-thread-5执行任务11
Thread:pool-1-thread-5执行任务12
Thread:pool-1-thread-5执行任务13
Thread:pool-1-thread-5执行任务14
Thread:pool-1-thread-5执行任务15
Thread:pool-1-thread-5执行任务16
Thread:pool-1-thread-5执行任务17
Thread:pool-1-thread-5执行任务18
Thread:pool-1-thread-2执行任务19
Thread:pool-1-thread-8执行任务7
Thread:pool-1-thread-10执行任务9
Thread:pool-1-thread-9执行任务8
这里创建了一个核心线程数为10的线程池,然后像其提交了20个任务,在我们的实际开发中,线程的数量到底应该设置多少其实没有一个规定,但一般情况下可以考虑参考下面的几个定论
1、假如你的任务是属于CPU密集型,因为线程会占用CPU资源而你的任务又是CPU密集型,所以可以适当的不要创建那么多的线程,比如线程数=CPU核数+1。
2、假如你的任务是属于IO密集型,对于这种类型的任务来说,可以适当的多创建一些线程,比如线程数=CPU核数*2 + 1。
3、假如你的任务是属于混合型,即CPU密集型和IO密集型,建议首先考虑是否可以拆分任务,其次多加测试,压测一下,根据实际情况来确定线程数。
4、假如你的任务是属于高并发,任务执行时间短的这种,可以考虑少创建几个线程池,因为任务执行周期短,并发高,会导致频繁的切换线程上下文。建议线程数=CPU核数+1。
5、假如你的任务属于高并发,任务执行时间长的这种,建议适当多创建几个线程,比如线程数=CPU核数*2 +1。
newSingleThreadExecutor:表示创建一个单个线程大小的线程池,这种线程池相当于单线程串行处理任务,一般情况下使用不多。还是来看一个demo
public static void main(String[] args) {
Executor excutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
excutor.execute(new Task(i));
}
}
static class Task implements Runnable {
private int index;
Task(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println("Thread:" + Thread.currentThread().getName() + "执行任务" + index);
}
}
运行结果:
Thread:pool-1-thread-1执行任务0
Thread:pool-1-thread-1执行任务1
Thread:pool-1-thread-1执行任务2
Thread:pool-1-thread-1执行任务3
Thread:pool-1-thread-1执行任务4
Thread:pool-1-thread-1执行任务5
Thread:pool-1-thread-1执行任务6
Thread:pool-1-thread-1执行任务7
Thread:pool-1-thread-1执行任务8
Thread:pool-1-thread-1执行任务9
Thread:pool-1-thread-1执行任务10
Thread:pool-1-thread-1执行任务11
Thread:pool-1-thread-1执行任务12
Thread:pool-1-thread-1执行任务13
Thread:pool-1-thread-1执行任务14
Thread:pool-1-thread-1执行任务15
Thread:pool-1-thread-1执行任务16
Thread:pool-1-thread-1执行任务17
Thread:pool-1-thread-1执行任务18
Thread:pool-1-thread-1执行任务19
newCachedThreadPool:表示创建一个不限制线程数的线程池,它的最大线程数为Integer.MAX_VALUE,这种类型的线程池平时我一般很少使用,因为极端情况下,如果任务多,线程处理不过来,会导致创建大量的线程,使CPU飙升甚至耗光资源。来看demo
public static void main(String[] args) {
Executor excutor = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
excutor.execute(new Task(i));
}
}
static class Task implements Runnable {
private int index;
Task(int index) {
this.index = index;
}
@Override
public void run() {
System.out.println("Thread:" + Thread.currentThread().getName() + "执行任务" + index);
}
}
可能运行的结果
Thread:pool-1-thread-1执行任务0
Thread:pool-1-thread-2执行任务1
Thread:pool-1-thread-6执行任务5
Thread:pool-1-thread-17执行任务18
Thread:pool-1-thread-8执行任务7
Thread:pool-1-thread-15执行任务16
Thread:pool-1-thread-4执行任务3
Thread:pool-1-thread-13执行任务14
Thread:pool-1-thread-11执行任务12
Thread:pool-1-thread-7执行任务6
Thread:pool-1-thread-9执行任务10
Thread:pool-1-thread-2执行任务9
Thread:pool-1-thread-3执行任务2
Thread:pool-1-thread-1执行任务8
Thread:pool-1-thread-5执行任务4
Thread:pool-1-thread-16执行任务17
Thread:pool-1-thread-18执行任务19
Thread:pool-1-thread-14执行任务15
Thread:pool-1-thread-12执行任务13
Thread:pool-1-thread-10执行任务11
newScheduledThreadPool:表示周期性的执行任务的处理,来看demo
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long start = new Date().getTime();
System.out.println("scheduleAtFixedRate 开始执行时间:" +
DateFormat.getTimeInstance().format(new Date()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = new Date().getTime();
System.out.println("scheduleAtFixedRate 执行花费时间=" + (end -start)/1000 + "s");
System.out.println("scheduleAtFixedRate 执行完成时间:"
+ DateFormat.getTimeInstance().format(new Date()));
System.out.println("======================================");
}
},1000,5000,TimeUnit.MILLISECONDS);
}
可能运行结果:
scheduleAtFixedRate 开始执行时间:10:23:11
scheduleAtFixedRate 执行花费时间=1s
scheduleAtFixedRate 执行完成时间:10:23:12
======================================
scheduleAtFixedRate 开始执行时间:10:23:16
scheduleAtFixedRate 执行花费时间=1s
scheduleAtFixedRate 执行完成时间:10:23:17
======================================
scheduleAtFixedRate 开始执行时间:10:23:21
scheduleAtFixedRate 执行花费时间=1s
scheduleAtFixedRate 执行完成时间:10:23:22
======================================
scheduleAtFixedRate 开始执行时间:10:23:26
scheduleAtFixedRate 执行花费时间=1s
scheduleAtFixedRate 执行完成时间:10:23:27
======================================
scheduleAtFixedRate 开始执行时间:10:23:31
scheduleAtFixedRate 执行花费时间=1s
scheduleAtFixedRate 执行完成时间:10:23:32
======================================
可以看到,线程周期性的,每隔5s执行一次任务处理。三、ThreadPoolExecutor
在上面分析的4种不同类型的线程池时,其实内部都是通过构建了一个ThreadPoolExecutor对象,在ThreadPoolExecutor的构建中,主要有几个参数需要介绍
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
corePoolSize:表示线程池里面的核心线程数量,当任务过来的时候,如果此时线程池里面的线程数量 < corePoolSize,那么就会创建一个核心线程来处理任务。当任务处理完后,如果线程空闲了,超过了配置的等待时间就会被回收。
maximumPoolSize:表示线程池里面最大的线程数,当任务过来的时候,如果核心线程已满,且任务阻塞队列也满了,就需要继续创建新的线程来处理任务,直到整个线程池里面线程数量达到maximumPoolSize为止。
keepAliveTime:线程空闲时的存活时间,即当线程没有任务处理时,它的存活时间,超过这个存活时间线程会被回收。
unit:时间单位,这个没什么好说的
workQueue:任务阻塞队列,当线程池里面的核心线程被使用完之后,如果还有任务继续过来,会被首先加入到阻塞队列里面等待被处理,Java提供了多种阻塞队列
其中ArrayBlockingQueue底层是基于数组实现的有界阻塞队列。
LinkedBlockingQueue底层是基于链表实现的有界阻塞队列。
PriorityBlockingQueue是基于优先级的*阻塞队列,队列里面的任务必须有一定的排序规则
SynchronizedQueue是一个不存储元素的队列,其实也不叫队列,因为它不存储元素,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
hander:任务的拒绝策略,当线程池里面的最大线程数也满的时候,这个时候再过来的任务就需要执行线程的任务拒绝策略了,如下
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
4、DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略。
四、任务提交
在JAVA里面提交任务有几种方式,一个是通过执行execute方法,该方法没有返回值,所以不知道任务是否提交成功了,另一个是submit方法,submit有返回值,具体用哪个根据需要选择即可。
void execute(Runnable command);
Future<?> submit(Runnable task);
五、内部如何实现
下面我们以一个例子来分析任务在线程池里面的执行过程,这里还是以newFixedThreadPool为例
Executor excutor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
excutor.execute(new Task(i));
}
当我们初始化了一个线程大小为10的线程池时,我们调用execute方法对任务进行提交。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
/**如果当前线程池里面的线程数小于核心线程数 */
if (workerCountOf(c) < corePoolSize) { --------->第一步
/**将当前任务传入Worker类的构造函数,由Worker类负责启动线程处理,true表示是核心线程,需要重新创建 */
if (addWorker(command, true))
return;
c = ctl.get();
}
/**如果当前线程池处于运行状态,且将当前任务放入了阻塞队列 */
if (isRunning(c) && workQueue.offer(command)) { -------->第二步
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) -------->第三步
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) ---------->第四步
reject(command);
}
第一步:获取当前线程池里面的线程数,判断当前的线程数是否比核心线程数小,如果比核心线程数小(否则进入第二步),那么调用addWorker(后面分析)将当前任务丢进去,起一个线程运行。
第二步:如果当前线程池里面线程数比核心线程数大,首先判断当前线程池是否处于RUNNING状态,如果处于,将任务offer到workQueue里面。
第三步:然后再次检查线程池的运行状态,如果此时线程池不处于RUNNING阶段,那么就移除阻塞队列里的任务,同时拒绝该任务。如果第二步没有进来,判断此时线程池里面线程数是否为0,如果为0表示没有可用线程了
第四步: 如果第二步里面检查的线程池运行状态不是RUNNING,那么就调到了第四步,执行addWorker方法创建新的线程执行任务,如果addWoker执行失败,则执行reject方法处理任务。
接下来看一下addWorker方法,是如何创建一个线程来执行任务的
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
/**获取当前线程池的运行状态 */
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**如果当前状态为Runnig && 假设状态为shutDown且传入的任务为Null,且阻塞队列有任务,这个时候就应该去执行阻塞队列里面的任务,否则返回false */
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/**自旋 */
for (;;) {
/**获取线程池里面的线程数量 */
int wc = workerCountOf(c);
/**如果超过了核心线程数,返回false */
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**CAS设置当前线程池里面的线程个数 */
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**构建一个Worker类,将当前需要执行的任务传进去 */
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
/** hashSet里面存当前的线程数*/
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
/** 添加成功就启动线程*/
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
/**添加失败后的处理,主要是线程池线程数-1,另外还尝试去中断线程 */
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这个方法重点的主要是这几步
1、首先根据传入参数判断是不是需要创建核心线程(传入的boolean值),如果是核心线程,且小于coreSize,那么就跳出循环,创建一个新线程
2、新线程的创建主要是靠worker类,Worker类继承了AQS,可以方便的使用AQS提供的中断等方法,同时实现了Runnable接口,可以将自己作为一个任务在工作线程中执行,在Worker的run方法然后调用了runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**尝试去阻塞队列里面去获取任务来执行 */
while (task != null || (task = getTask()) != null) { ------>关键一步
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); --------关键一步
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker是线程池实现的核心,可以看到它一直在while循环判断,getTask方法是自旋从workQueue里面取等待的任务出来执行,每次在执行的时候,都要先获取锁,线程启动之后释放锁。最后回到
execute的实现就分析到这,有很多细枝末节没有说,抓住主干就行,其实很多时候我们在实际工作中并不是很关心某个类或者某项技术的具体实现细节,太多了也不可能全记住,这些方法都有注释,想看的时候看看注释即可,个人愚见!
上一篇: 网站最大并发数以及服务器配置预估
下一篇: php解决高并发问题