ThreadPoolExecutor是如何做到线程重用的
ThreadPoolExecutor是如何做到线程重用的
<ul class="article_tags clearfix csdn-tracking-statistics" data-mod="popu_377">
<li class="tit">标签:</li>
看关于ThreadPoolExecutor参数时,看到了keepaliveTime这个参数,这个参数的意思是:“当线程数大于CorePoolSize时,如果有没有等到新的Task,到了keepaliveTime时间后,就自动终止掉”。那么如果在这个时间之前,等到了新的Task,就可以重用这个线程。到底是怎么重用线程的呢?
正文:
原理如下:
前提条件:假如coreSize=3,maxSize=10,当前存在线程数是5。
(注意,存在的这5个线程,并不是你执行ExecuteService.execute/submit时的参数,而是为了执行execute/submit的参数所启动的“内部线程”。这个“内部线程”其实是通过ThreadPoolExecutor的ThreadFactory参数生成的线程,而“execute/submit的参数”是执行在这些“内部线程”里面的。)存在这5个“内部线程”,都访问同一个队列,从队列中去取任务执行(任务就是通过execute/submit提交的Runnable参数),当任务充足时,5个“内部线程”都持续执行。重点是没有任务时怎么办?
没有任务时,这5个“内部线程”都会做下面判断:
- 如果poolSize > coreSize,那就从队列里取任务,当过了keepaliveTime这么长时间还没有得到任务的话,当前这个“内部线程”就会结束(使用的是BlockingQueue.poll方法)。
- 如果poolSize <= coreSize,那就以“阻塞”的方式,去从队列里取任务,当得到任务后,就继续执行。这样的话,这个线程就不会结束掉。
如果没有任务可以继续执行了,最后只剩下coreSize那么多的“内部线程”留在线程池里,等待重用。
相关代码如下:
1,从ExecuteService.execute方法开始说,execute方法代码如下:
int c = ctl.get();
// 当“内部线程”数 < coreSize的话,就新建一个“内部线程”
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 能执行到这里,说明“内部线程”数 >= coreSize。这样的话,就把任务加入队列。
// 这个队列,就是上面说的所有“内部线程”访问的那个队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
2,下面再看一下addWorker方法,看看“内部线程”是如何建立的。
// 上面一部分代码省略
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个Worker实例,把我们提交的任务(也就是firstTask)传入到Worker里。
// 然后,取得Worker里的thread的属性,并在下面运行这个thread。
// (其实这个thread属性,就是通过ThreadFactory生成的Thread,也就是所说的“内部线程”)
// 接下来我们看看Worker类
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
下面我们看看Worker类,其中和本次说明没有什么关系的代码省略。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// 这里把提交的任务保存到Worker里,在后面执行的时候,从Worker里取得这个任务执行。
this.firstTask = firstTask;
// 下面就是生成“内部线程”的代码
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 这个方法,是“内部线程”实际上执行的方法。原理是这样的:
// 上面构造函数在生成“内部线程”时,把Worker本身,当做参数传给了“内部线程”
// “内部线程”在自己的run方法里,执行的是worker的run方法,而worker的run方法执行的是runWorker这个方法。
// runWorker方法是ThreadPoolExectuor的方法,也就是说,线程池里的“内部线程”在调用run方法时,都是执行的这个方法
//(所以这个方法要注意同步)
runWorker(this);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
3,最后我们看看runWorker方法。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task!=null是为了运行“内部线程”启动时,提交的给它的那个线程。
// (task = getTask()) != null) 是当提交给他的任务执行完后,看看队列里还有没有任务可以执行。
// 如果有的话,这个“内部线程”就可以重用了。
// 而“内部线程都会做的判断”,就是getTask()方法里面做的,下面看一下getTask()方法
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {// 省略异常处理
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
getTask()方法原代码如下
// 这个语句后半部分来判断“内部线程数 > coreSize”
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 下面的判断,就是上面说的“内部线程都会做的判断”
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
最后
如果要测试看看线程是不是被重用的话,可以通过自己实现ThreadFactory,加个打印语句,来查看newThread方法被调用的次数。下面的代码是参考Executors的代码,加了一条打印语句
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
// 通过查看调用的次数,可以来判断new了几个内部线程
System.out.println(namePrefix + threadNumber.get() + " is created " + " hashcode:" + t.hashCode());
return t;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
其它
下面的这篇文章是对线程池的一个说明,里面有一个简单的线程池的实现,和ThreadPoolExecutor有些类似,可以参考一下:
Thread pools and work queues
版权声明:本文为博主原创文章,未经博主允许随便转载。(你喜欢就好)
- 本文已收录于以下专栏:
import java.util.concurrent.BlockingQueue;
import java….
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup=window.slotbydup || []).push({
id: '4765209',
container: s,
size: '808,120',
display: 'inlay-fix'
});
})();
【免费技术直播】数据科学家,从入门到精进
Java 多线程编程之九:使用 Executors 和 ThreadPoolExecutor 实现的 Java 线程池的例子
线程池用来管理工作线程的数量,它持有一个等待被执行的线程的队列。
java.util.concurrent.Executors 提供了 java.util.concurrent.Exe…
Java中的线程池——ThreadPoolExecutor的使用
开发过程中,合理地使用线程池可以带来3个好处:
降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线…
java并发包学习系列:线程复用之线程池
频繁使用new Thread来创建线程的方式并不太好。因为每次new Thread新建和销毁对象性能较差,线程缺乏统一管理。好在Java提供了线程池,它能够有效的管理、调度线程,避免过多的资源消耗。优…
(function() {
var s = "_" + Math.random().toString(36).slice(2);
document.write('');
(window.slotbydup=window.slotbydup || []).push({
id: '4983339',
container: s,
size: '808,120',
display: 'inlay-fix'
});
})();
人人都能看懂的 AI 入门课
Java 线程池ThreadPoolExecutor(基于jdk1.8)(二)
上一篇分析了ThreadPoolExecutor的execute方法的具体执行过程,这一篇主要分析当中的几个重要的函数。…
Java多线程:ThreadPoolExecutor详解
ThreadPoolExecutor是JDK并发包提供的一个线程池服务,基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。
ThreadPoolExecu…
Java史上最大误解,你真的以为LinkedList比ArrayList增删快?
从我们学习java的List时候就被灌输这样的一个概念:
ArrayList随机访问快;
LinkedList增删快;然而事实真的是这样吗?请看以下代码:(任何人都能复制这份代码进行尝试)pack…
80秒验证13亿个身份证号码,包含省市县验证
我写了一个验证身份证号码的程序,它是以一定内存空间(大概100M)换取cpu消耗,然后它的运算量就降低了,前十四位的验证就相当于转换类型再查表一样,所以它的验证号码速度比一般的方式快。如果还不明白就说…
ThreadPoolExecutor 是如何做到线程重用的
前言:
看关于ThreadPoolExecutor参数时,看到了keepaliveTime这个参数,这个参数的意思是:“当线程数大于CorePoolSize时,如果有没有等到新的Task,到了ke…
线程测试参数ThreadPoolExecutor()
1 JDK 自带线程池
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(in…
ThreadPoolExecutor线程池及线程扩展策略
一、概述
1、ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;
2、E…
Java 线程池学习 Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用简介
Java 线程池学习
Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用…
ThreadPoolExecutor,worker和线程工厂之间理解
ThreadPoolExecutor中一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。而Worker带了锁,根据我…
Java多线程之~~~使用ThreadPoolExecutor来创建线程
以前我们创建线程的时候都是主动的new一个Thread,然后调用他们的start方法,但是如果线程非常多,任务也非
常多的时候,这样写就会显得非常麻烦,当然可能效率也不是很高,Java给我们提供了叫线…
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介,线程邮件发送实例
在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多对并发特性的支持,例如:线程池。
一、简介
线程池类为 java.util.concurrent.ThreadPoolExecuto…
线程学习三:线程池ThreadPoolExecutor 与 Executors
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*…
ThreadPoolExecutor中运行线程名称的修改
项目中使用到了ThreadPoolExecutor,这个是挺好的东西,线程池的实现。但有一点不太爽的是,用Jprofiler调试由它创建的线程的时候,看到的都是pool-1-thread-1\2\3…
java线程API学习 线程池ThreadPoolExecutor
转自:http://blog.csdn.net/ABBuggy/archive/2011/06/16/6548843.aspx
线程池ThreadPoolExecutor继承自ExecutorSer…
Java多线程 之 ThreadPoolExecutor(九)
最近在工作中遇到了ThreadPoolExecutor的使用,而且是由于它的配置不当导致了线上问题。下面对其进行简单介绍。
先看看ThreadPoolExecutor常用的构造方法: public…