Java concurrency线程池之线程池原理(二)_动力节点Java学院整理
线程池示例
在分析线程池之前,先看一个简单的线程池示例。
import java.util.concurrent.executors; import java.util.concurrent.executorservice; public class threadpooldemo1 { public static void main(string[] args) { // 创建一个可重用固定线程数的线程池 executorservice pool = executors.newfixedthreadpool(2); // 创建实现了runnable接口对象,thread对象当然也实现了runnable接口 thread ta = new mythread(); thread tb = new mythread(); thread tc = new mythread(); thread td = new mythread(); thread te = new mythread(); // 将线程放入池中进行执行 pool.execute(ta); pool.execute(tb); pool.execute(tc); pool.execute(td); pool.execute(te); // 关闭线程池 pool.shutdown(); } } class mythread extends thread { @override public void run() { system.out.println(thread.currentthread().getname()+ " is running."); } }
运行结果:
pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running. pool-1-thread-2 is running. pool-1-thread-1 is running.
示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析threadpoolexecutor。
线程池源码分析
(一) 创建“线程池”
下面以newfixedthreadpool()介绍线程池的创建过程。
1. newfixedthreadpool()
newfixedthreadpool()在executors.java中定义,源码如下:
public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); }
说明:newfixedthreadpool(int nthreads)的作用是创建一个线程池,线程池的容量是nthreads。
newfixedthreadpool()在调用threadpoolexecutor()时,会传递一个linkedblockingqueue()对象,而linkedblockingqueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
关于linkedblockingqueue的实现细节,读者可以参考"java多线程系列--“juc集合”08之 linkedblockingqueue"。
2. threadpoolexecutor()
threadpoolexecutor()在threadpoolexecutor.java中定义,源码如下:
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), defaulthandler); }
说明:该函数实际上是调用threadpoolexecutor的另外一个构造函数。该函数的源码如下:
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null) throw new nullpointerexception(); // 核心池大小 this.corepoolsize = corepoolsize; // 最大池大小 this.maximumpoolsize = maximumpoolsize; // 线程池的等待队列 this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); // 线程工厂对象 this.threadfactory = threadfactory; // 拒绝策略的句柄 this.handler = handler; }
说明:在threadpoolexecutor()的构造函数中,进行的是初始化工作。
corepoolsize, maximumpoolsize, unit, keepalivetime和workqueue这些变量的值是已知的,它们都是通过newfixedthreadpool()传递而来。下面看看threadfactory和handler对象。
2.1 threadfactory
线程池中的threadfactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadfactory)来完成的。
上面所说的threadfactory对象,是通过 executors.defaultthreadfactory()返回的。executors.java中的defaultthreadfactory()源码如下:
public static threadfactory defaultthreadfactory() { return new defaultthreadfactory(); }
defaultthreadfactory()返回defaultthreadfactory对象。executors.java中的defaultthreadfactory()源码如下:
static class defaultthreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger(1); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger(1); private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-thread-"; } // 提供创建线程的api。 public thread newthread(runnable r) { // 线程对应的任务是runnable对象r thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0); // 设为“非守护线程” if (t.isdaemon()) t.setdaemon(false); // 将优先级设为“thread.norm_priority” if (t.getpriority() != thread.norm_priority) t.setpriority(thread.norm_priority); return t; } }
说明:threadfactory的作用就是提供创建线程的功能的线程工厂。
它是通过newthread()提供创建线程功能的,下面简单说说newthread()。newthread()创建的线程对应的任务是runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是thread.norm_priority”。
2.2 rejectedexecutionhandler
handler是threadpoolexecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
线程池默认会采用的是defaulthandler策略,即abortpolicy策略。在abortpolicy策略中,线程池拒绝任务时会抛出异常!
defaulthandler的定义如下:
private static final rejectedexecutionhandler defaulthandler = new abortpolicy();
abortpolicy的源码如下:
public static class abortpolicy implements rejectedexecutionhandler { public abortpolicy() { } // 抛出异常 public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception("task " + r.tostring() + " rejected from " + e.tostring()); } }
(二) 添加任务到“线程池”
1. execute()
execute()定义在threadpoolexecutor.java中,源码如下:
public void execute(runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new nullpointerexception(); // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息 int c = ctl.get(); // 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corepoolsize个任务。 // 则通过addworker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; c = ctl.get(); } // 当线程池中的任务数量 >= "核心池大小"时, // 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。 if (isrunning(c) && workqueue.offer(command)) { // 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。 int recheck = ctl.get(); if (! isrunning(recheck) && remove(command)) reject(command); // 否则,如果"线程池中任务数量"为0,则通过addworker(null, false)尝试新建一个线程,新建线程对应的任务为null。 else if (workercountof(recheck) == 0) addworker(null, false); } // 通过addworker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 // 如果addworker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addworker(command, false)) reject(command); }
说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
情况1 -- 如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corepoolsize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
情况2 -- 如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
情况3 -- 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。
2. addworker()
addworker()的源码如下:
private boolean addworker(runnable firsttask, boolean core) { retry: // 更新"线程池状态和计数"标记,即更新ctl。 for (;;) { // 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息 int c = ctl.get(); // 获取线程池状态。 int rs = runstateof(c); // 有效性检查 if (rs >= shutdown && ! (rs == shutdown && firsttask == null && ! workqueue.isempty())) return false; for (;;) { // 获取线程池中任务的数量。 int wc = workercountof(c); // 如果"线程池中任务的数量"超过限制,则返回false。 if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; // 通过cas函数将c的值+1。操作失败的话,则退出循环。 if (compareandincrementworkercount(c)) break retry; c = ctl.get(); // re-read ctl // 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。 if (runstateof(c) != rs) continue retry; // else cas failed due to workercount change; retry inner loop } } boolean workerstarted = false; boolean workeradded = false; worker w = null; // 添加任务到线程池,并启动任务所在的线程。 try { final reentrantlock mainlock = this.mainlock; // 新建worker,并且指定firsttask为worker的第一个任务。 w = new worker(firsttask); // 获取worker对应的线程。 final thread t = w.thread; if (t != null) { // 获取锁 mainlock.lock(); try { int c = ctl.get(); int rs = runstateof(c); // 再次确认"线程池状态" if (rs < shutdown || (rs == shutdown && firsttask == null)) { if (t.isalive()) // precheck that t is startable throw new illegalthreadstateexception(); // 将worker对象(w)添加到"线程池的worker集合(workers)"中 workers.add(w); // 更新largestpoolsize int s = workers.size(); if (s > largestpoolsize) largestpoolsize = s; workeradded = true; } } finally { // 释放锁 mainlock.unlock(); } // 如果"成功将任务添加到线程池"中,则启动任务所在的线程。 if (workeradded) { t.start(); workerstarted = true; } } } finally { if (! workerstarted) addworkerfailed(w); } // 返回任务是否启动。 return workerstarted; }
说明:
addworker(runnable firsttask, boolean core) 的作用是将任务(firsttask)添加到线程池中,并启动该任务。
core为true的话,则以corepoolsize为界限,若"线程池中已有任务数量>=corepoolsize",则返回false;core为false的话,则以maximumpoolsize为界限,若"线程池中已有任务数量>=maximumpoolsize",则返回false。
addworker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。
从addworker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的worker对象;而一个workder对象包含一个thread对象。(01) 通过将worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动worker对应的thread线程,则执行该任务。
3. submit()
补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:
public future<?> submit(runnable task) { if (task == null) throw new nullpointerexception(); runnablefuture<void> ftask = newtaskfor(task, null); execute(ftask); return ftask; }
(三) 关闭“线程池”
shutdown()的源码如下:
public void shutdown() { final reentrantlock mainlock = this.mainlock; // 获取锁 mainlock.lock(); try { // 检查终止线程池的“线程”是否有权限。 checkshutdownaccess(); // 设置线程池的状态为关闭状态。 advancerunstate(shutdown); // 中断线程池中空闲的线程。 interruptidleworkers(); // 钩子函数,在threadpoolexecutor中没有任何动作。 onshutdown(); // hook for scheduledthreadpoolexecutor } finally { // 释放锁 mainlock.unlock(); } // 尝试终止线程池 tryterminate(); }
说明:shutdown()的作用是关闭线程池。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
Java concurrency线程池之线程池原理(一)_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(二)_动力节点Java学院整理
-
Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理
-
Java class文件格式之常量池_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(四)_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(三)_动力节点Java学院整理
-
Java concurrency线程池之Callable和Future_动力节点Java学院整理
-
Java concurrency之公平锁(二)_动力节点Java学院整理
-
Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(二)_动力节点Java学院整理