源码阅读(39):Java中线程安全的Queue、Deque结构——LinkedTransferQueue(2)
接上文《源码阅读(38):Java中线程安全的Queue、Deque结构——LinkedTransferQueue(1)》
2.3、LinkedTransferQueue的主要属性和构造函数
LinkedTransferQueue队列集合主要包括以下关键属性:
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
// 该节点指向单向链表中第一个可进行有效判定后续结点是否是出队操作的节点
transient volatile Node head;
// 该节点试图指向单向链表的尾部,但往往处理过程中都不能保证状态一直成立
private transient volatile Node tail;
// ......
}
LinkedTransferQueue队列的主要构造函数有以下两个:
- 默认的构造函数
// ......
public LinkedTransferQueue() {
head = tail = new Node();
}
// ......
默认构造函数进行了单向链表的初始化,即为单向链表创建一个虚节点。该节点的isData属性被赋值为true,它和该节点item属性实际引用数据对象的情况冲突(item属性为null)。这样的虚节点将在后续单向链表的工作中,被清除。
- 一个可以使用外部集合进行实例化的构造函数
public LinkedTransferQueue(Collection<? extends E> c) {
Node h = null, t = null;
for (E e : c) {
Node newNode = new Node(Objects.requireNonNull(e));
if (h == null) {
h = t = newNode;
}
else {
t.appendRelaxed(t = newNode);
}
}
if (h == null) {
h = t = new Node();
}
head = h;
tail = t;
}
// ......
2.4、xfer
xfer方法从字面上可以直译为“传送”,它是指通过多种操作模式,利用LinkedTransferQueue队列内置的单向链表,使数据对象在生产者和消费者间进行传递。xfer方法是LinkedTransferQueue队列最核心的操作方法之一,其支撑了诸如offer、add、put、transfer、tryTransfer、take、poll等方法的内部实现。JDK 9+开始,xfer操作的逻辑做了一次较大的改造,处理逻辑变得更加高效,本节开始我们将进行详细介绍。
xfer的调用方式主要分为四种工作模式,在xfer方法的入参中表现为四个不同的数值:
-
NOW (0):即时模式,当xfer被调用时,如果操作线程无法立即获取到调用结果,则返回null。NOW所代表的即时模式在生产者和消费者调用中都有应用场景,例如tryTransfer方法、poll方法。
-
SYNC (2):同步模式,当xfer被操作线程调用时,只有xfer操作过程达到了调用线程所期望的结果(或抛出异常),调用者才会继续向下执行,否则就一直处于阻塞状态下。比如在消费者使用SYNC同步模式调用xfer方法时,除非从LinkedTransferQueue队列中获取到了数据对象,否则消费者线程将一直处于阻塞状态。SYNC所表示的同步模式,在生产者和消费者调用中都有应用场景,例如transfer方法、take方法。
-
TIMED (3):限时(超时)模式,当xfer被操作线程调用时,如果在限定的时间内调用线程没有达到所期望的结果,调用者将不再等待结果,并解除阻塞状态。比如生产者在规定的时间内没有等到任何消费者取出对应的数据对象,则生产者不在继续等待。TIMED所表示的超时模式,在生产者和消费者调用中都有应用场景,例如tryTransfer方法、poll方法。
-
ASYNC (1):异步模式,当xfer被操作线程调用时,无论xfer操作过程时候完成,调用者都不会进行阻塞等待,后者会继续进行自身业务过程的处理。ASYNC所表示的异步模式出现在生产者调用的场景下,例如:offer方法、add方法、put方法。
以下代码为xfer方法的代码片段,我们先进行概要解读,然后再使用消费者线程和生产者线程的调用场景做示例讲解:
// ......
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null)) {
throw new NullPointerException();
}
// 最外层的for循环,遵循cas操作思想,只要操作不符合预期,就不停的重新操作
// 直到操作结果符合预期为止
restart: for (Node s = null, t = null, h = null;;) {
// 这是初始化是决定当前p的位置是依据当前单向链表的head节点进行引用还是依据当前单向链表的tail节点进行引用
// 其本质判断是当前操作是入队操作还是出队操作
// 其最本质的判定是当前xfer操作的性质(haveData)和当前链表tail引用位置所描述的操作性质(t.isData)是否一致
// 如果操作性质一致,当前xfer操作就是从tail引用位置开始判定和进行的入队操作
// 如果操作性质不一致,当前xfer操作就是从head引用位置开始判定和进行的出队操作
for (Node p = (t != (t = tail) && t.isData == haveData) ? t : (h = head);; ) {
final Node q; final Object item;
// 出队操作的场景,其处理策略在此代码段落
// 只有当前处理节点p的isData标识和入参的haveData标识一致
// 且当前处理节点p真实的数据对象存在情况和入参的haveData标识一致
if (p.isData != haveData && haveData == ((item = p.item) == null)) {
// 将局部变量h引用与当前单向链表的head位置
// 避免在多线程情况下head引用被改变引起的处理错误
if (h == null) {
h = head;
}
// 对当前节点进行原子性赋值:
// 如果是生产者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为e(不会为null)
// 如果是消费者任务从队列中取出,那么赋值成功后,当前节点p的item属性将为null
if (p.tryMatch(item, e)) {
// 在双跳队列进行数据取数操作时,当前处理节点p可能和h不一致,但一定是在h代表的节点“附近”
// 所以,如果条件成立,就要进行以h代表的节点为基准的链表清理操作
if (h != p) {
skipDeadNodesNearHead(h, p);
}
return (E) item;
}
}
// 入队操作的场景,其处理策略在此代码段落
// 加入队列的可能是消费者任务,也可能是生产者任务
// 根据之前对单向链表tail引用位置的描述,tail引用的位置不一定是单向链表的最后一个节点
// 所以首先将p节点移动到链表的最后一个节点,否则就不进行业务逻辑处理
if ((q = p.next) == null) {
// 操作方式为NOW的入队操作,将会被忽略
if (how == NOW) {
return e;
}
// 入队操作需要生成一个新的Node节点
if (s == null) {
s = new Node(e);
}
// 使用原子操作,将当前操作s结点引用到当前p结点的item属性
// 如果操作失败,说明p结点的next操作已经被其它线程中的操作所引用,
// 那么通过内层的for循环继续进行操作
if (!p.casNext(null, s)) {
continue;
}
// 当前p节点引用的位置和t节点引用的可能是单向链表tail处的位置可能不一样
// 引起这个的原因可能有很多:
// a、当前xfer操作在中为p节点关联next属性的操作:p.casNext(null, s)不停失败,
// 不停的在第二层for循环中做q = p.next 和 p == (p = q) 操作
// b、虽然xfer操作成功了,但是当前线程连续进行了两次xfer调用操作(不好理解?后文将进行图例化讲解)
if (p != t) {
casTail(t, s);
}
if (how == ASYNC) {
return e;
}
return awaitMatch(s, p, e, (how == TIMED), nanos);
}
// 让p引用指向当前节点的下一个节点
// 如果当前节点的next属性指向自己,说明当前节点已经被移除队列
// 按照cas的思路,本次xfer操作需要重来
if (p == (p = q)) {
continue restart;
}
}
}
}
// ......
/** Tries to CAS-match this node; if successful, wakes waiter. */
// 这是LinkedTransferQueue.Node类中的方法
// 方法尝试如果当前Node对象的item属性值为cmp的情况下,重新赋值为val
// 如果设置成功,则解除当前Node所代表的等待线程的阻塞状态
// 这个阻塞状态的线程可能是生产者,也可能是生产者。
final boolean tryMatch(Object cmp, Object val) {
if (casItem(cmp, val)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
// 这是LinkedTransferQueue.Node类中的方法
// 该方法尝试如果当前Node对象的item属性值为cmp的情况下,重新赋值为val,并返回true
// 否则就返回false
final boolean casItem(Object cmp, Object val) {
// assert isData == (cmp != null);
// assert isData == (val == null);
// assert !(cmp instanceof Node);
return ITEM.compareAndSet(this, cmp, val);
}
// ......
xfer方法一共有四个方法参数,这里说明如下:
e:该参数就是本次进行传输的数据对象,如果当前xfer方法被消费者线程端调用,则e为null。
haveData:该参数指示本次xfer方法的调用是否有数据对象通过上一个e参数进行传入,也就是说e和haveData这两个参数是配对使用的。当e为null时,haveData应该为false;反之当e不为nul时,haveData应该为true;
how:本次xfer方法的操作模式。一共有四种NOW, ASYNC, SYNC, 或者 TIMED,上文已经介绍过,这里就不再进行赘述了。
nanos:本次xfer方法的操作超时时间(单位纳秒),当本次操作的操作模式为TIMED时(限时/超时模式),需要通过该参数指定本次操作的超时时间。
在阅读xfer方法时,还应该特别注意:
单向链表中一定会有至少一个Node节点,既是LinkedTransferQueue队列集合通过默认的构造函数进行实例化时构建的“虚”节点,该节点的isData属性被标识为true,并且和该节点item属性实际引用数据对象的情况冲突(item属性为null),这样一来无论xfer操作进行的是入队操作还是出队操作,这个虚拟节点都会被排除在操作逻辑以外。并且因为q = p.next和p = q两个操作语句的缘故,代表当前正在处理的p引用会向单向链表的后续结点移动。下面几种单向链表的示意图都是在处理过程中可能出现的场景:
- head引用位置的节点是“虚”节点的,任务模式全为存储任务(生产者)的单向链表:
- 整个链表的任务模式全为取数任务(消费者)的单向链表:
- 整个链表的任务模式全为存储任务(生产者)的单向链表,但是tail引用位置不在链表末尾:
但是,以下情况的单向链表是不可能出现的——既有生产者任务,又有消费者任务:
我们设想一下这样的典型应用场景:有多个生产者线程调用LinkedTransferQueue队列的方法,将数据对象添加到队列中,接下来又有多个消费者线程从LinkedTransferQueue队列中取出数据对象。下面我们就用图文的方式,基于以上描述的xfer方法逻辑,对这种场景进行详细介绍,为了便于讨论,我们先介绍生产者的操作,再介绍消费者的操作。另外,我们假设构建LinkedTransferQueue队列时,都是使用的默认构造函数