Java 高并发六:JDK并发包2详解
1. 线程池的基本使用
1.1.为什么需要线程池
平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题,线程池的作用就是将线程进行复用。
1.2.jdk为我们提供了哪些支持
jdk中的相关类图如上图所示。
其中要提到的几个特别的类。
callable类和runable类相似,但是区别在于callable有返回值。
threadpoolexecutor是线程池的一个重要实现。
而executors是一个工厂类。
1.3.线程池的使用
1.3.1.线程池的种类
- new fixedthreadpool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。
- new singlethreadexecutor 单一线程池,线程池中只有一个线程。
- new cachedthreadpool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。
- new scheduledthreadpool 计划任务调度的线程池,用于执行计划任务,比如每隔5分钟怎么样,
public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); } public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>())); } public static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>()); }
从方法上来看,显然 fixedthreadpool,singlethreadexecutor,cachedthreadpool都是threadpoolexecutor的不同实例,只是参数不同。
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), defaulthandler); }
下面来简述下 threadpoolexecutor构造函数中参数的含义。
- corepoolsize 线程池中核心线程数的数目
- maximumpoolsize 线程池中最多能容纳多少个线程
- keepalivetime 当现在线程数目大于corepoolsize时,超过keepalivetime时间后,多出corepoolsize的那些线程将被终结。
- unit keepalivetime的单位
- workqueue 当任务数量很大,线程池中线程无法满足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。
这样在来看上面所说的fixedthreadpool,它的线程的核心数目和最大容纳数目都是一样的,以至于在工作期间,并不会创建和销毁线程。当任务数量很大,线程池中的线程无法满足时,任务将被保存到linkedblockingqueue中,而linkedblockingqueue的大小是integer.max_value。这就意味着,任务不断地添加,会使内存消耗越来越大。
而cachedthreadpool则不同,它的核心线程数量是0,最大容纳数目是integer.max_value,它的阻塞队列是synchronousqueue,这是一个特别的队列,它的大小是0。由于核心线程数量是0,所以必然要将任务添加到synchronousqueue中,这个队列只有一个线程在从中添加数据,同时另一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为什么cachedthreadpool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。
1.4.线程池使用的小例子
1.4.1.简单线程池
import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class threadpooldemo { public static class mytask implements runnable { @override public void run() { system.out.println(system.currenttimemillis() + "thread id:" + thread.currentthread().getid()); try { thread.sleep(1000); } catch (exception e) { e.printstacktrace(); } } } public static void main(string[] args) { mytask mytask = new mytask(); executorservice es = executors.newfixedthreadpool(5); for (int i = 0; i < 10; i++) { es.submit(mytask); } } }
由于使用的newfixedthreadpool(5),但是启动了10个线程,所以每次执行5个,并且 可以很明显的看到线程的复用,threadid是重复的,也就是前5个任务和后5个任务都是同一批线程去执行的。
这里用的是
es.submit(mytask);
还有一种提交方式:
es.execute(mytask);
区别在于submit会返回一个future对象,这个将在以后介绍。
1.4.2.scheduledthreadpool
import java.util.concurrent.executors; import java.util.concurrent.scheduledexecutorservice; import java.util.concurrent.timeunit; public class threadpooldemo { public static void main(string[] args) { scheduledexecutorservice ses = executors.newscheduledthreadpool(10); //如果前面的任务还未完成,则调度不会启动。 ses.schedulewithfixeddelay(new runnable() { @override public void run() { try { thread.sleep(1000); system.out.println(system.currenttimemillis()/1000); } catch (exception e) { // todo: handle exception } } }, 0, 2, timeunit.seconds);//启动0秒后执行,然后周期2秒执行一次 } }
输出:
1454832514
1454832517
1454832520
1454832523
1454832526
...
由于任务执行需要1秒,任务调度必须等待前一个任务完成。也就是这里的每隔2秒的意思是,前一个任务完成后2秒再开启新的一个任务。
2. 扩展和增强线程池
2.1.回调接口
线程池中有一些回调的api来给我们提供扩展的操作。
executorservice es = new threadpoolexecutor(5, 5, 0l, timeunit.seconds, new linkedblockingqueue<runnable>()){ @override protected void beforeexecute(thread t, runnable r) { system.out.println("准备执行"); } @override protected void afterexecute(runnable r, throwable t) { system.out.println("执行完成"); } @override protected void terminated() { system.out.println("线程池退出"); } };
我们可以通过实现threadpoolexecutor的子类去覆盖threadpoolexecutor的beforeexecute,afterexecute,terminated方法来实现在线程执行前后,线程池退出时的日志管理或其他操作。
2.2.拒绝策略
有时候,任务非常繁重,导致系统负载太大。在上面说过,当任务量越来越大时,任务都将放到fixedthreadpool的阻塞队列中,导致内存消耗太大,最终导致内存溢出。这样的情况是应该要避免的。因此当我们发现线程数量要超过最大线程数量时,我们应该放弃一些任务。丢弃时,我们应该把任务记下来,而不是直接丢掉。
threadpoolexecutor中还有另一个构造函数。
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null) throw new nullpointerexception(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadfactory; this.handler = handler; }
threadfactory我们在后面再介绍。
而handler就是拒绝策略的实现,它会告诉我们,如果任务不能执行了,该怎么做。
共有以上4种策略。
abortpolicy:如果不能接受任务了,则抛出异常。
callerrunspolicy:如果不能接受任务了,则让调用的线程去完成。
discardoldestpolicy:如果不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。
discardpolicy:如果不能接受任务了,则丢弃任务。
executorservice es = new threadpoolexecutor(5, 5, 0l, timeunit.seconds, new linkedblockingqueue<runnable>(), new rejectedexecutionhandler() { @override public void rejectedexecution(runnable r, threadpoolexecutor executor) { system.out.println(r.tostring() + "is discard"); } });
当然我们也可以自己实现rejectedexecutionhandler接口来自己定义拒绝策略。
2.3.自定义threadfactory
刚刚已经看到了,在threadpoolexecutor的构造函数中可以指定threadfactory。
线程池中的线程都是由线程工厂创建出来,我们可以自定义线程工厂。
默认的线程工厂:
static class defaultthreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger(1); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger(1); private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-thread-"; } public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0); if (t.isdaemon()) t.setdaemon(false); if (t.getpriority() != thread.norm_priority) t.setpriority(thread.norm_priority); return t; } }
3. forkjoin
3.1.思想
就是分而治之的思想。
fork/join类似mapreduce算法,两者区别是:fork/join 只有在必要时如任务非常大的情况下才分割成一个个小任务,而 mapreduce总是在开始执行第一步进行分割。看来,fork/join更适合一个jvm内线程级别,而mapreduce适合分布式系统。
4.2.使用接口
recursiveaction:无返回值
recursivetask:有返回值
4.3.简单例子
import java.util.arraylist; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkjointask; import java.util.concurrent.recursivetask; public class counttask extends recursivetask<long>{ private static final int threshold = 10000; private long start; private long end; public counttask(long start, long end) { super(); this.start = start; this.end = end; } @override protected long compute() { long sum = 0; boolean cancompute = (end - start) < threshold; if(cancompute) { for (long i = start; i <= end; i++) { sum = sum + i; } }else { //分成100个小任务 long step = (start + end)/100; arraylist<counttask> subtasks = new arraylist<counttask>(); long pos = start; for (int i = 0; i < 100; i++) { long lastone = pos + step; if(lastone > end ) { lastone = end; } counttask subtask = new counttask(pos, lastone); pos += step + 1; subtasks.add(subtask); subtask.fork();//把子任务推向线程池 } for (counttask t : subtasks) { sum += t.join();//等待所有子任务结束 } } return sum; } public static void main(string[] args) { forkjoinpool forkjoinpool = new forkjoinpool(); counttask task = new counttask(0, 200000l); forkjointask<long> result = forkjoinpool.submit(task); try { long res = result.get(); system.out.println("sum = " + res); } catch (exception e) { // todo: handle exception e.printstacktrace(); } } }
上述例子描述了一个累加和的任务。将累加任务分成100个任务,每个任务只执行一段数字的累加和,最后join后,把每个任务计算出的和再累加起来。
4.4.实现要素
4.4.1.workqueue与ctl
每一个线程都会有一个工作队列
static final class workqueue
在工作队列中,会有一系列对线程进行管理的字段
volatile int eventcount; // encoded inactivation count; < 0 if inactive
int nextwait; // encoded record of next event waiter
int nsteals; // number of steals
int hint; // steal index hint
short poolindex; // index of this queue in pool
final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
forkjointask<?>[] array; // the elements (initially unallocated)
final forkjoinpool pool; // the containing pool (may be null)
final forkjoinworkerthread owner; // owning thread or null if shared
volatile thread parker; // == owner during call to park; else null
volatile forkjointask<?> currentjoin; // task being joined in awaitjoin
forkjointask<?> currentsteal; // current non-local task being executed
这里要注意的是,jdk7和jdk8在forkjoin的实现上有了很大的差别。我们这里介绍的是jdk8中的。 在线程池中,有时不是所有的线程都在执行的,部分线程会被挂起,那些挂起的线程会被存放到一个栈中。内部通过一个链表表示。
nextwait会指向下一个等待的线程。
poolindex线程在线程池中的下标索引。
eventcount 在初始化时,eventcount与poolindex有关。总共32位,第一位表示是否被激活,15位表示被挂起的次数
eventcount,剩下的表示poolindex。用一个字段来表示多个意思。
工作队列workqueue用forkjointask<?>[] array来表示。而top,base来表示队列的两端,数据在这两者之间。
在forkjoinpool中维护着ctl(64位long型)
volatile long ctl;
* field ctl is a long packed with:
* ac: number of active running workers minus target parallelism (16 bits)
* tc: number of total workers minus target parallelism (16 bits)
* st: true if pool is terminating (1 bit)
* ec: the wait count of top waiting thread (15 bits)
* id: poolindex of top of treiber stack of waiters (16 bits)
ac表示活跃的线程数减去并行度(大概就是cpu个数)
tc表示总的线程数减去并行度
st表示线程池本身是否是激活的
ec表示顶端等待线程的挂起数
id表示顶端等待线程的poolindex
很明显st+ec+id就是我们刚刚所说的 eventcount 。
那么为什么明明5个变量,非要合成一个变量呢。其实用5个变量占用容量也差不多。
用一个变量代码的可读性上会差很多。
那么为什么用一个变量呢?其实这点才是最巧妙的地方,因为这5个变量是一个整体,在多线程中,如果用5个变量,那么当修改其中一个变量时,如何保证5个变量的整体性。那么用一个变量则就解决了这个问题。如果用锁解决,则会降低性能。
用一个变量则保证了数据的一致性和原子性。
在forkjoin中队ctl的更改都是使用cas操作,在前面系列的文章中已经介绍过,cas是无锁的操作,性能很好。
由于cas操作也只能针对一个变量,所以这种设计是最优的。
4.4.2.工作窃取
接下来要介绍下整个线程池的工作流程。
每个线程都会调用runworker
final void runworker(workqueue w) { w.growarray(); // allocate queue for (int r = w.hint; scan(w, r) == 0; ) { r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
scan()函数是扫描是否有任务要做。
r是一个相对随机的数字。
private final int scan(workqueue w, int r) { workqueue[] ws; int m; long c = ctl; // for consistency check if ((ws = workqueues) != null && (m = ws.length - 1) >= 0 && w != null) { for (int j = m + m + 1, ec = w.eventcount;;) { workqueue q; int b, e; forkjointask<?>[] a; forkjointask<?> t; if ((q = ws[(r - j) & m]) != null && (b = q.base) - q.top < 0 && (a = q.array) != null) { long i = (((a.length - 1) & b) << ashift) + abase; if ((t = ((forkjointask<?>) u.getobjectvolatile(a, i))) != null) { if (ec < 0) helprelease(c, ws, w, q, b); else if (q.base == b && u.compareandswapobject(a, i, t, null)) { u.putorderedint(q, qbase, b + 1); if ((b + 1) - q.top < 0) signalwork(ws, q); w.runtask(t); } } break; } else if (--j < 0) { if ((ec | (e = (int)c)) < 0) // inactive or terminating return awaitwork(w, c, ec); else if (ctl == c) { // try to inactivate and enqueue long nc = (long)ec | ((c - ac_unit) & (ac_mask|tc_mask)); w.nextwait = e; w.eventcount = ec | int_sign; if (!u.compareandswaplong(this, ctl, c, nc)) w.eventcount = ec; // back out } break; } } } return 0; }
我们接下来看看scan方法,scan的一个参数是workqueue,上面已经说过,每个线程都会拥有一个workqueue,那么多个线程的workqueue就会保存在workqueues里面,r是一个随机数,通过r来找到某一个 workqueue,在workqueue里面有所要做的任务。
然后通过workqueue的base,取得base的偏移量。
b = q.base
..
long i = (((a.length - 1) & b) << ashift) + abase;
..
然后通过偏移量得到最后一个的任务,运行这个任务
t = ((forkjointask<?>)u.getobjectvolatile(a, i))
..
w.runtask(t);
..
通过这个大概的分析理解了过程,我们发现,当前线程调用scan方法后,不会执行当前的workqueue中的任务,而是通过一个随机数r,来得到其他 workqueue的任务。这就是forkjoinpool的主要的一个机理。
当前线程不会只着眼于自己的任务,而是优先完成其他任务。这样做来,防止了饥饿现象的发生。这样就预防了某些线程因为卡死或者其他原因而无法及时完成任务,或者某个线程的任务量很大,其他线程却没事可做。
然后来看看runtask方法
final void runtask(forkjointask<?> task) { if ((currentsteal = task) != null) { forkjoinworkerthread thread; task.doexec(); forkjointask<?>[] a = array; int md = mode; ++nsteals; currentsteal = null; if (md != 0) pollandexecall(); else if (a != null) { int s, m = a.length - 1; forkjointask<?> t; while ((s = top - 1) - base >= 0 && (t = (forkjointask<?>)u.getandsetobject (a, ((m & s) << ashift) + abase, null)) != null) { top = s; t.doexec(); } } if ((thread = owner) != null) // no need to do in finally clause thread.aftertoplevelexec(); } }
有一个有趣的命名:currentsteal,偷得的任务,的确是刚刚解释的那样。
task.doexec();
将会完成这个任务。
完成了别人的任务以后,将会完成自己的任务。
通过得到top来获得自己任务第一个任务
while ((s = top - 1) - base >= 0 && (t = (forkjointask<?>)u.getandsetobject(a, ((m & s) << ashift) + abase, null)) != null) { top = s; t.doexec(); }
接下来,通过一个图来总结下刚刚线程池的流程
比如有t1,t2两个线程,t1会通过t2的base来获得t2的最后一个任务(当然实际上是通过一个随机数r来取得某个线程最后一个任务),t1也会通过自己的top来执行自己的第一个任务。反之,t2也会如此。
拿其他线程的任务都是从base开始拿的,自己拿自己的任务是从top开始拿的。这样可以减少冲突
如果没有找到其他任务
else if (--j < 0) { if ((ec | (e = (int)c)) < 0) // inactive or terminating return awaitwork(w, c, ec); else if (ctl == c) { // try to inactivate and enqueue long nc = (long)ec | ((c - ac_unit) & (ac_mask|tc_mask)); w.nextwait = e; w.eventcount = ec | int_sign; if (!u.compareandswaplong(this, ctl, c, nc)) w.eventcount = ec; // back out } break; }
那么首先会通过一系列运行来改变ctl的值,获得了nc,然后用cas将新的值赋值。然后就调用awaitwork()将线程进入等待状态(调用的 前面系列文章中提到的unsafe的park方法)。
这里要说明的是改变ctl值这里,首先是将ctl中的ac-1,ac是占ctl的前16位,所以不能直接-1,而是通过ac_unit(0x1000000000000)来达到使ctl的前16位-1的效果。
前面说过eventcount中有保存poolindex,通过poolindex以及workqueue中的nextwait,就能遍历所有的等待线程。