并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
前两篇文章讲了线程池的源码分析,再来看这篇文章就比较简单了, 本文主要讲解 executors 这个工具类,看看长江创建线程池的几种方法。
newfixedthreadpool
- 生成一个固定大小的线程池:
1 public static executorservice newfixedthreadpool(int nthreads) { 2 return new threadpoolexecutor(nthreads, nthreads, 3 0l, timeunit.milliseconds, 4 new linkedblockingqueue<runnable>()); 5 }
最大线程数设置为与核心线程数相等,则不会创建临时线程,创建的线程都是核心线程,线程也不会被回收。此时 keepalivetime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corepoolsize 内的线程),任务队列采用 linkedblockingqueue,*队列,所以fixedthreadpool永远不会拒绝, 即饱和策略失效。
过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nthreads 后,不再创建新的线程,而是把任务提交到 linkedblockingqueue 中,而且之后线程数始终为 nthreads。
newsinglethreadexecutor
- 生成只有一个线程的固定线程池
1 public static executorservice newsinglethreadexecutor() { 2 return new finalizabledelegatedexecutorservice 3 (new threadpoolexecutor(1, 1, 4 0l, timeunit.milliseconds, 5 new linkedblockingqueue<runnable>())); 6 }
这个更简单,和上面的一样,只要设置线程数为 1 就可以了。
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。
由于使用了*队列, 所以singlethreadpool永远不会拒绝, 即饱和策略失效。
newcachedthreadpool
- 生成一个需要的时候就创建新的线程
1 public static executorservice newsinglethreadexecutor() { 2 return new finalizabledelegatedexecutorservice 3 (new threadpoolexecutor(1, 1, 4 0l, timeunit.milliseconds, 5 new linkedblockingqueue<runnable>())); 6 }
核心线程数为 0,最大线程数为 integer.max_value,keepalivetime 为 60 秒,任务队列采用 synchronousqueue,所以创建的线程都是临时线程,都可以被回收。
这种线程池对于任务可以比较快速地完成的情况有比较好的性能。如果线程空闲了 60 秒都没有任务,那么将关闭此线程并从线程池中移除。所以如果线程池空闲了很长时间也不会有问题,因为随着所有的线程都会被关闭,整个线程池不会占用任何的系统资源。
1 int c = ctl.get(); 2 // corepoolsize 为 0,所以不会进到这个 if 分支 3 if (workercountof(c) < corepoolsize) { 4 if (addworker(command, true)) 5 return; 6 c = ctl.get(); 7 } 8 // offer 如果有空闲线程刚好可以接收此任务,那么返回 true,否则返回 false 9 if (isrunning(c) && workqueue.offer(command)) { 10 int recheck = ctl.get(); 11 if (! isrunning(recheck) && remove(command)) 12 reject(command); 13 else if (workercountof(recheck) == 0) 14 addworker(null, false); 15 } 16 else if (!addworker(command, false)) 17 reject(command);
过程分析:我把 execute 方法的主体粘贴过来,让大家看得明白些。鉴于 corepoolsize 是 0,那么提交任务的时候,直接将任务提交到队列中,由于采用了 synchronousqueue,所以如果是第一个任务提交的时候,offer 方法肯定会返回 false,因为此时没有任何 worker 对这个任务进行接收,那么将进入到最后一个分支来创建第一个 worker,第一个worker执行完后就gettask()从队列中取任务。之后再提交任务的话,取决于是否有空闲下来的线程对任务进行接收,如果有,会进入到第二个 if 语句块中把当前任务给正在等待的worker,如果没有空闲的线程在等待取任务,就是和第一个任务一样,进到最后的 else if 分支创建worker。
我们来仔细分析下代码,第一次添加任务时,执行到第9行 workqueue.offer(command),我把以前文章里面的offer()代码贴过来,如果有感兴趣的可以去看看《并发编程(十)—— java 并发队列 blockingqueue 实现之 synchronousqueue源码分析》
1 public boolean offer(e e) { 2 if (e == null) throw new nullpointerexception(); 3 return transferer.transfer(e, true, 0) != null; 4 }
1 /** 2 * puts or takes an item. 3 */ 4 object transfer(object e, boolean timed, long nanos) { 5 6 qnode s = null; // constructed/reused as needed 7 boolean isdata = (e != null); 8 9 for (;;) { 10 qnode t = tail; 11 qnode h = head; 12 if (t == null || h == null) // saw uninitialized value 13 //说明还没有初始化,则跳出继续循环,直至初始化完成 14 continue; // spin 15 16 // 走到这里,说明已经初始化完成,但是初始化时head = h;tail = h;head和tail都是相同的空节点 17 // 如果h == t为false,则判断t.isdata == isdata,判断队尾节点和当前节点类型是否一致 18 // 队列空,或队列中节点类型和当前节点一致, 19 // 即我们说的第一种情况,将节点入队即可。读者要想着这块 if 里面方法其实就是入队 20 if (h == t || t.isdata == isdata) { // empty or same-mode 21 qnode tn = t.next; 22 // t != tail 说明刚刚有节点入队,continue 即可 23 if (t != tail) // inconsistent read 24 continue; 25 // 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可 26 if (tn != null) { // lagging tail 27 // 这个方法就是:如果 tail 此时为 t 的话,设置为 tn 28 advancetail(t, tn); 29 continue; 30 } 31 // 32 if (timed && nanos <= 0) // can't wait 33 return null; 34 // s == null,则创建一个新节点 35 if (s == null) 36 s = new qnode(e, isdata); 37 // 将当前节点,插入到 tail 的后面 38 if (!t.casnext(null, s)) // failed to link in 39 continue; 40 41 // 将当前节点设置为新的 tail 42 advancetail(t, s); // swing tail and wait 43 // 看到这里,请读者先往下滑到这个方法,看完了以后再回来这里,思路也就不会断了 44 object x = awaitfulfill(s, e, timed, nanos); 45 // 到这里,说明之前入队的线程被唤醒了,准备往下执行 46 // 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点 47 if (x == s) { // wait was cancelled 48 clean(t, s); 49 return null; 50 } 51 // 若s节点被设置为取消 52 if (!s.isofflist()) { // not already unlinked 53 advancehead(t, s); // unlink if head 54 if (x != null) // and forget fields 55 s.item = s; 56 s.waiter = null; 57 } 58 return (x != null) ? x : e; 59 60 // 这里的 else 分支就是上面说的第二种情况,有相应的读或写相匹配的情况 61 } else { // complementary-mode 62 qnode m = h.next; // node to fulfill 63 // 不一致读,表明有其他线程修改了队列 64 if (t != tail || m == null || h != head) 65 continue; // inconsistent read 66 67 object x = m.item; 68 if (isdata == (x != null) || // m already fulfilled 69 x == m || // m cancelled 70 !m.casitem(x, e)) { // lost cas 71 advancehead(h, m); // dequeue and retry 72 continue; 73 } 74 75 advancehead(h, m); // successfully fulfilled 76 locksupport.unpark(m.waiter); 77 return (x != null) ? x : e; 78 } 79 } 80 } 81 82 void advancetail(qnode t, qnode nt) { 83 if (tail == t) 84 unsafe.compareandswapobject(this, tailoffset, t, nt); 85 }
第一次offer(command)时,我们可以看到 transfer 方法中 第32行处 timed && nanos <= 0 成立,此时return null,则offer返回false,所以第一次添加任务时,就会执行最后的 else if (!addworker(command, false)) 添加一个worker,如果这个worker执行完任务,在gettask()中从等待队列中取任务,这时如果有线程提交任务,则在 if (isrunning(c) && workqueue.offer(command)) 处给到空闲的线程;如果等待超过60秒,则关闭此线程;如果此时线程还在执行任务,还有线程提交任务,则还会执行到最后的 else if (!addworker(command, false)) 添加一个worker。
synchronousqueue 是一个比较特殊的 blockingqueue,其本身不储存任何元素,它有一个虚拟队列(或虚拟栈),不管读操作还是写操作,如果当前队列中存储的是与当前操作相同模式的线程,那么当前操作也进入队列中等待;如果是相反模式,则配对成功,从当前队列中取队头节点。具体的信息,可以看我的另一篇关于 blockingqueue 的文章。
上一篇: Tomcat服务器
下一篇: JDK、JRE、JVM三者间的联系与区别