并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
blockingqueue 实现之 synchronousqueue
synchronousqueue是一个没有数据缓冲的blockingqueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
不像arrayblockingqueue或linkedlistblockingqueue,synchronousqueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
synchronousqueue的一个使用场景是在线程池里。executors.newcachedthreadpool()就使用了synchronousqueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
接下来,我们来看看具体的源码实现吧,它的源码不是很简单的那种,我们需要先搞清楚它的设计思想。
我们先看大框架:
1 // 构造时,我们可以指定公平模式还是非公平模式,本文主要讲解公平模式 2 public synchronousqueue(boolean fair) { 3 transferer = fair ? new transferqueue() : new transferstack(); 4 } 5 abstract static class transferer { 6 // 从方法名上大概就知道,这个方法用于转移元素,从生产者手上转到消费者手上 7 // 也可以被动地,消费者调用这个方法来从生产者手上取元素 8 // 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者 9 // 如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素 10 // 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值 11 // 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。 12 abstract object transfer(object e, boolean timed, long nanos); 13 } 14 15 transferqueue() { 16 //初始化时,head和tail都是空节点 17 qnode h = new qnode(null, false); // initialize to dummy node. 18 head = h; 19 tail = h; 20 }
transferer 有两个内部实现类,是因为构造 synchronousqueue 的时候,我们可以指定公平策略。公平模式意味着,所有的读写线程都遵守先来后到,fifo 嘛,对应 transferqueue。而非公平模式则对应 transferstack。
本文我们主要看公平模式源码,接下来,我们看看 put 方法和 take 方法:
1 // 写入值 2 public void put(e o) throws interruptedexception { 3 if (o == null) throw new nullpointerexception(); 4 if (transferer.transfer(o, false, 0) == null) { // 1 5 thread.interrupted(); 6 throw new interruptedexception(); 7 } 8 } 9 // 读取值并移除 10 public e take() throws interruptedexception { 11 object e = transferer.transfer(null, false, 0); // 2 12 if (e != null) 13 return (e)e; 14 thread.interrupted(); 15 throw new interruptedexception(); 16 }
我们看到,写操作 put(e o) 和读操作 take() 都是调用 transferer.transfer(…) 方法,区别在于第一个参数是否为 null 值。
我们来看看 transfer 的设计思路,其基本算法如下:
- 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列并阻塞线程。
- 如果队列中有等待节点,而且与当前操作可以匹配(1、如队列中都是读操作线程,当前线程是写操作线程;2、如队列中都是写操作线程,当前线程是读操作;)。这种情况下,匹配等待队列的队头,出队,返回相应数据。
其实这里有个隐含的条件被满足了,队列如果不为空,肯定都是同种类型的节点,要么都是读操作,要么都是写操作。这个就要看到底是读线程积压了,还是写线程积压了。
我们可以假设出一个男女配对的场景:
1、一个男的过来,如果一个人都没有,那么他需要等待;如果发现有一堆男的在等待,那么他需要排到队列后面;如果发现是一堆女的在排队,那么他直接牵走队头的那个女的;
2、相反一个女的过来,如果一个人都没有,那么她需要等待;如果发现有一堆女的在等待,那么她需要排到队列后面;如果发现是一堆男的在排队,那么队头的那个男的直接出队牵走这个女的;
既然这里说到了等待队列,我们先看看其实现,也就是 qnode:
1 static final class qnode { 2 volatile qnode next; // 可以看出来,等待队列是单向链表 3 volatile object item; // cas'ed to or from null 4 volatile thread waiter; // 将线程对象保存在这里,用于挂起和唤醒 5 final boolean isdata; // 用于判断是写线程节点(isdata == true),还是读线程节点 6 7 qnode(object item, boolean isdata) { 8 this.item = item; 9 this.isdata = isdata; 10 } 11 ......
我们再来看 transfer 方法的代码:
1 /** 2 * puts or takes an item. 3 */ 4 object transfer(object e, boolean timed, long nanos) { 5 6 qnode s = null; // constructed/reused as needed 7 boolean isdata = (e != null); 8 9 for (;;) { 10 qnode t = tail; 11 qnode h = head; 12 if (t == null || h == null) // saw uninitialized value 13 //说明还没有初始化,则跳出继续循环,直至初始化完成 14 continue; // spin 15 16 // 走到这里,说明已经初始化完成,但是初始化时head = h;tail = h;head和tail都是相同的空节点 17 // 如果h == t为false,则判断t.isdata == isdata,判断队尾节点和当前节点类型是否一致 18 // 队列空,或队列中节点类型和当前节点一致, 19 // 即我们说的第一种情况,将节点入队即可。读者要想着这块 if 里面方法其实就是入队 20 if (h == t || t.isdata == isdata) { // empty or same-mode 21 qnode tn = t.next; 22 // t != tail 说明刚刚有节点入队,continue 即可 23 if (t != tail) // inconsistent read 24 continue; 25 // 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可 26 if (tn != null) { // lagging tail 27 // 这个方法就是:如果 tail 此时为 t 的话,设置为 tn 28 advancetail(t, tn); 29 continue; 30 } 31 // 32 if (timed && nanos <= 0) // can't wait 33 return null; 34 // s == null,则创建一个新节点 35 if (s == null) 36 s = new qnode(e, isdata); 37 // 将当前节点,插入到 tail 的后面 38 if (!t.casnext(null, s)) // failed to link in 39 continue; 40 41 // 将当前节点设置为新的 tail 42 advancetail(t, s); // swing tail and wait 43 // 看到这里,请读者先往下滑到这个方法,看完了以后再回来这里,思路也就不会断了 44 object x = awaitfulfill(s, e, timed, nanos); 45 // 到这里,说明之前入队的线程被唤醒了,准备往下执行 46 // 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点 47 if (x == s) { // wait was cancelled 48 clean(t, s); 49 return null; 50 } 51 // 若s节点被设置为取消 52 if (!s.isofflist()) { // not already unlinked 53 advancehead(t, s); // unlink if head 54 if (x != null) // and forget fields 55 s.item = s; 56 s.waiter = null; 57 } 58 return (x != null) ? x : e; 59 60 // 这里的 else 分支就是上面说的第二种情况,有相应的读或写相匹配的情况 61 } else { // complementary-mode 62 qnode m = h.next; // node to fulfill 63 // 不一致读,表明有其他线程修改了队列 64 if (t != tail || m == null || h != head) 65 continue; // inconsistent read 66 67 object x = m.item; 68 if (isdata == (x != null) || // m already fulfilled 69 x == m || // m cancelled 70 !m.casitem(x, e)) { // lost cas 71 advancehead(h, m); // dequeue and retry 72 continue; 73 } 74 75 advancehead(h, m); // successfully fulfilled 76 locksupport.unpark(m.waiter); 77 return (x != null) ? x : e; 78 } 79 } 80 } 81 82 void advancetail(qnode t, qnode nt) { 83 if (tail == t) 84 unsafe.compareandswapobject(this, tailoffset, t, nt); 85 }
1 // 自旋或阻塞,直到满足条件,这个方法返回 2 object awaitfulfill(qnode s, object e, boolean timed, long nanos) { 3 4 long lasttime = timed ? system.nanotime() : 0; 5 thread w = thread.currentthread(); 6 // 判断需要自旋的次数, 7 int spins = ((head.next == s) ? 8 (timed ? maxtimedspins : maxuntimedspins) : 0); 9 for (;;) { 10 // 如果被中断了,那么取消这个节点 11 if (w.isinterrupted()) 12 // 就是将当前节点 s 中的 item 属性设置为 this 13 s.trycancel(e); 14 object x = s.item; 15 // 这里是这个方法的唯一的出口 16 if (x != e) 17 return x; 18 // 如果需要,检测是否超时 19 if (timed) { 20 long now = system.nanotime(); 21 nanos -= now - lasttime; 22 lasttime = now; 23 if (nanos <= 0) { 24 s.trycancel(e); 25 continue; 26 } 27 } 28 if (spins > 0) 29 --spins; 30 // 如果自旋达到了最大的次数,那么检测 31 else if (s.waiter == null) 32 s.waiter = w; 33 // 如果自旋到了最大的次数,或者没有设置超时,那么线程挂起,等待唤醒 34 else if (!timed) 35 //挂起当前线程 36 locksupport.park(this); 37 else if (nanos > spinfortimeoutthreshold) 38 locksupport.parknanos(this, nanos); 39 } 40 } 41 42 void trycancel(object cmp) { 43 //将节点item设置为自己,代表此节点取消排队 44 unsafe.compareandswapobject(this, itemoffset, cmp, this); 45 }
现在我们来按照实际情况来走一遍流程:
1、线程1初始化 new synchronousqueue(true) ,调用 put(e o)写入值,我们看 transfer(object e, boolean timed, long nanos) 方法第7行,isdata 为true,接在到第20行,因为是刚刚初始化,tail和head都为空节点,36行新建一个节点,38行将当前节点,插入到 tail 的后面,42行将当前节点设置为新的 tail,所以队列中有三个节点,两个是空节点,一个是当前节点,我们再看到 awaitfulfill(qnode s, object e, boolean timed, long nanos) 方法第16行,此时x = e,不返回,36行处挂起当前线程。
此时线程2调用 take() 取值,我们看 transfer(object e, boolean timed, long nanos) 方法第7行,isdata 为false,接在到第20行,tail和head不相同,之前线程1写入时,qnode 的isdata 为true,所以 if (h == t || t.isdata == isdata) 不满足,进入到transfer的62行,取到头节点的后面一个节点,很明显,这个节点的item是null,68行处x != null为false, isdata == (x != null) 为true,则执行71行将头节点后移一位,相当于去除了头结点,跳出循环继续;这一次循环第62行取到的是线程1中添加的节点,x != null为true,isdata == (x != null)为false,x == m 为false,执行 m.casitem(x, e) ,此时e为null,将线程1中添加的节点的item的值设置为null,此时成功,不执行72行处,我们可以看到75行处,将头结点后移一位,相当于线程一put进去的值被移除了,76行处唤醒线程1,刚才说过这次循环62行取到的是线程1中添加的节点,68行处x为线程一中添加的元素,77行 return (x != null) ? x : e; x != null 为true,则retrue x,此时take()拿到了线程1中添加的元素,并唤醒线程1,将线程一添加的节点去除,take()方法结束。我们再来看看线程1被唤醒后,看 awaitfulfill 方法中第36行,被唤醒后接着for循环,第14行获取被挂起之前添加的节点中的item,可是上面讲的线程2中m.casitem(x, e) 已经将此节点的e设置为null ,则17行处retrue null,再到transfer 方法44行处,x为null,58行处返回e,put方法结束。
2、线程1初始化 new synchronousqueue(true) ,调用 take() 取值,我们看 transfer(object e, boolean timed, long nanos) 方法第7行,isdata 为false,,因为是刚刚初始化,tail和head都为空节点,36行新建一个节点,38行将当前节点,插入到 tail 的后面,42行将当前节点设置为新的 tail,所以队列中有三个节点,两个是空节点,一个是当前节点,我们再看到 awaitfulfill(qnode s, object e, boolean timed, long nanos) 方法第16行,此时x = e =null,不返回,36行处挂起当前线程。
此时线程2调用 put(e o)写入值,我们看 transfer(object e, boolean timed, long nanos) 方法第7行,isdata 为true,接在到第20行,tail和head不相同,之前线程1取值时,qnode 的isdata 为false,所以 if (h == t || t.isdata == isdata) 不满足,进入到transfer的62行,取到头节点的后面一个节点,很明显,这个节点的item是null,68行处x != null为false, isdata == (x != null) 为true,则执行71行将头节点后移一位,相当于去除了头结点,跳出循环继续;这一次循环第62行取到的是线程1中等待取值的节点,x != null为false,isdata == (x != null)为false,x == m 为false,执行 m.casitem(x, e) ,将线程1中take()的节点的item的值设置为e,此时成功,不执行72行处,我们可以看到75行处,将头结点后移一位,相当于线程1取到值后就将线程从等待队列中移除了,76行处唤醒线程1,刚才说过这次循环62行取到的是线程1中等待取值的节点,68行处x为null,77行 return (x != null) ? x : e; x != null 为false,则 retrue e,此时put()方法给线程1的qnode设置了item为e,并唤醒线程1,将线程一添加的节点去除,take()方法结束。我们再来看看线程1被唤醒后,看 awaitfulfill 方法中第36行,被唤醒后接着for循环,第14行获取被挂起之前添加的节点中的item,可是上面讲的线程2 put() 中m.casitem(x, e) 已经将此节点的e设置为 e ,则17行处retrue 的值为put()的 e ,再到transfer 方法44行处,x为e,58行处返回x,也就是put()的e ,take方法结束。
总结
1、线程做相同类型的操作:
多个线程 take() ,则将线程包装成qnode节点,item为null,将节点添加到队列,将线程挂起;
多个线程 put() ,将线程包装成qnode节点,item为 e,将节点添加到队列,将线程挂起。
2、线程做不同类型的操作:
有线程先做了put() ,其他线程做take() 操作时,take取到队列中的第一个等待节点中的item,take返回item,并将第一个等待节点唤醒,put返回e;
有线程先做了take() ,其他线程put() 操作时,put将元素e赋值给队列中第一个等待节点的item,put返回e,并将第一个等待节点唤醒,take返回e。
上一篇: 浅析Java中的23种设计模式
下一篇: 初识ActiveMQ
推荐阅读
-
并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
-
并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
-
Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)
-
并发编程(八)—— Java 并发队列 BlockingQueue 实现之 ArrayBlockingQueue 源码分析
-
并发编程(十)—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
-
并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
-
并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析
-
并发编程(八)—— Java 并发队列 BlockingQueue 实现之 ArrayBlockingQueue 源码分析
-
Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)