源码分析-ConcurrentLinkedQueue
一.序言
现在并发操作中都要求高效,都在想怎么去掉直接加锁带来的线程切换的开销,这里分享自己对concurrentLinkedQueue 的部分代码的理解,看看他无锁的原因,了解大神的设计思路。
关于 它的工作流程 参考JDK1.6 :http://ifeve.com/concurrentlinkedqueue/
本文分析基于JDK 1.7.0_79
二.源码分析
1.介绍:concurrentlinkedqueue 设计有head 和 tail 两个节点,以及节点类 Node,主要看Node 部分
private static class Node<E> { // Node 里面包含了 item 节点值 以及 下一个节点 next // item 和 next 都是valatile 可见性保证了 volatile E item; volatile Node<E> next; private static final sun.misc.Unsafe UNSAFE; // 并且初始化的时候 就会获得item 和 next 的偏移量 // 这为后面的cas 做了准备,如何使用继续看下面 private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } }
2.从常规的 offer 方法进入,因为并发的主要也是offer 和 remove 上的竞争。
public boolean offer(E e) { checkNotNull(e);// 检查,为空直接异常 // 创建新节点,并将e 作为节点的item final Node<E> newNode = new Node<E>(e); // 这里操作比较多,将尾节点tail 赋给变量 t,p for (Node<E> t = tail, p = t;;) { // 并获取q 也就是 tail 的下一个节点 Node<E> q = p.next; // 如果下一个节点是null,说明tail 是处于尾节点上 if (q == null) { // 然后用cas 将下一个节点设置成为新节点 // 这里用cas 操作,如果多线程的情况,总会有一个先执行成功,失败的线程继续执行循环。 // 关于casNext 的分析,跳转 1.1 // <A> if (p.casNext(null, newNode)) { // 如果p.casNext有个线程成功了,p=newNode // 比较 t (tail) 是不是 最后一个节点 if (p != t) // 如果不等,就利用cas将,尾节点移到最后 // 如果失败了,那么说明有其他线程已经把tail移动过,也是OK的 <B> casTail(t, newNode); return true; } // 如果<A>失败了,说明肯定有个线程成功了, // 这时候失败的线程,又会执行for 循环,再次设值,直到成功。 } else if (p == q) // 有可能刚好插入一个,然后P 就被删除了,那么 p==q // 这时候在头结点需要从新定位。 p = (t != (t = tail)) ? t : head; else // 这里是为了当P不是尾节点的时候,将P 移到尾节点,方便下一次插入 // 也就是一直保持向前推进 p = (p != t && t != (t = tail)) ? t : q; } }
1.1 casNext 分析
// 对应上面的 Node<E> q = p.next;p.casNext(null,newNode) // 他是一个Node 内的方法, boolean casNext(Node<E> cmp, Node<E> val) { // 可以看到,它是用p.next (null) 未偏移量,设置新值的 // cas 是可以根据内存中的偏移量 改变值,详细这里不解释 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 既然是可以并发执行,那么当多个线程同一时间执行到这里的时候,必然只有1个 成功,后面的都失// 败。关于成功和失败的处理 继续回到上面 1
3.poll 方法解释
public E poll() { // 设置起始点 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 利用cas 将第一个节点,设置未null if (item != null && p.casItem(item, null)) { // 和上面类似,p的next被删了, // 然后然后判断一下,目的为了保证head的next不为空 if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 有可能已经被另外线程先删除了下一个节点 // 那么需要先设定head 的位置,并返回null updateHead(h, p); return null; } else if (p == q) // 这个一般是删完了(有点模糊) continue restartFromHead; else // 和offer 类似,这历使用保证下一个节点有值,才能删除 p = q; } } }
4.remove 方法
public boolean remove(Object o) { if (o == null) return false; Node<E> pred = null; // 这里是从head 开始的,中间还涉及到head 的判断等从操作 // 跟一般for 循环类似,要遍历的- -,这样的操作o 很靠后的时候,会慢- -! // - -不太喜欢这方法,了解作用 for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item) && p.casItem(item, null)) { Node<E> next = succ(p); if (pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; }
5.size 分析
public int size() { int count = 0; // 很纠结的是,这里依然用循环获取,判断节点是否有值。然后累加 // 比较伤,可能作者是考虑offer poll 等操作开始计算,难以控制,用这种原始的方法 // 比较伤- -,做为了解 for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null){ // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; } return count; }
6.contains 方法
// 这个方法 也是用线性 遍历 比较的,也不多说了 public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && o.equals(item)) return true; } return false; }
三.小结
1. 这个东东,方法很多,就没一一分析了,主要是了解他的 构成和主要的代码逻辑,当然我没吃透他- -,毕竟写不出来,或者很好的该进,有问题还请指出啦,谢谢。
2.关于这个东东的应用,可能更倾向于offer 和poll 的高并发场景,而且你会发现每次都要创建node,cas 在竞争很激烈的情况,CPU会拉高,我觉得吧 可以采用,JDK1.8 Long 原子类增加的方法,多几个cell 减少cas 的问题。
3.像size,remove 等方法,这个我是比建议大量使用的,毕竟每个类的特点,不能照顾全部的优势,用就要用她的优势吧,其他地方的话,可以适当改写,当然~。~还是等 完全掌握了才行吧~。~
4.这个类的精华并不是cas 的应用,我感觉是它设计上 对几个变量的应用,保证的 前后的推进 以及判断,这个是非常厉害,不得不服啊
上一篇: flink-watermark
下一篇: 求职那些事