Java concurrency集合之ConcurrentLinkedQueue_动力节点Java学院整理
concurrentlinkedqueue介绍
concurrentlinkedqueue是线程安全的队列,它适用于“高并发”的场景。
它是一个基于链接节点的*线程安全队列,按照 fifo(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。
concurrentlinkedqueue原理和数据结构
concurrentlinkedqueue的数据结构,如下图所示:
说明:
1. concurrentlinkedqueue继承于abstractqueue。
2. concurrentlinkedqueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。concurrentlinkedqueue按照 fifo(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。
3. concurrentlinkedqueue的链表node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。concurrentlinkedqueue就是通过volatile来实现多线程对竞争资源的互斥访问的。
concurrentlinkedqueue函数列表
// 创建一个最初为空的 concurrentlinkedqueue。 concurrentlinkedqueue() // 创建一个最初包含给定 collection 元素的 concurrentlinkedqueue,按照此 collection 迭代器的遍历顺序来添加元素。 concurrentlinkedqueue(collection<? extends e> c) // 将指定元素插入此队列的尾部。 boolean add(e e) // 如果此队列包含指定元素,则返回 true。 boolean contains(object o) // 如果此队列不包含任何元素,则返回 true。 boolean isempty() // 返回在此队列元素上以恰当顺序进行迭代的迭代器。 iterator<e> iterator() // 将指定元素插入此队列的尾部。 boolean offer(e e) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 e peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 e poll() // 从队列中移除指定元素的单个实例(如果存在)。 boolean remove(object o) // 返回此队列中的元素数量。 int size() // 返回以恰当顺序包含此队列所有元素的数组。 object[] toarray() // 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 <t> t[] toarray(t[] a)
下面从concurrentlinkedqueue的创建,添加,删除这几个方面对它进行分析。
1 创建
下面以concurrentlinkedqueue()来进行说明。
public concurrentlinkedqueue() { head = tail = new node<e>(null); }
说明:在构造函数中,新建了一个“内容为null的节点”,并设置表头head和表尾tail的值为新节点。
head和tail的定义如下:
private transient volatile node<e> head; private transient volatile node<e> tail;
head和tail都是volatile类型,他们具有volatile赋予的含义:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。
node的声明如下:
private static class node<e> { volatile e item; volatile node<e> next; node(e item) { unsafe.putobject(this, itemoffset, item); } boolean casitem(e cmp, e val) { return unsafe.compareandswapobject(this, itemoffset, cmp, val); } void lazysetnext(node<e> val) { unsafe.putorderedobject(this, nextoffset, val); } boolean casnext(node<e> cmp, node<e> val) { return unsafe.compareandswapobject(this, nextoffset, cmp, val); } // unsafe mechanics private static final sun.misc.unsafe unsafe; private static final long itemoffset; private static final long nextoffset; static { try { unsafe = sun.misc.unsafe.getunsafe(); class k = node.class; itemoffset = unsafe.objectfieldoffset (k.getdeclaredfield("item")); nextoffset = unsafe.objectfieldoffset (k.getdeclaredfield("next")); } catch (exception e) { throw new error(e); } } }
说明:
node是个单向链表节点,next用于指向下一个node,item用于存储数据。node中操作节点数据的api,都是通过unsafe机制的cas函数实现的;例如casnext()是通过cas函数“比较并设置节点的下一个节点”。
2. 添加
下面以add(e e)为例对concurrentlinkedqueue中的添加进行说明。
public boolean add(e e) { return offer(e); }
说明:add()实际上是调用的offer()来完成添加操作的。
offer()的源码如下:
public boolean offer(e e) { // 检查e是不是null,是的话抛出nullpointerexception异常。 checknotnull(e); // 创建新的节点 final node<e> newnode = new node<e>(e); // 将“新的节点”添加到链表的末尾。 for (node<e> t = tail, p = t;;) { node<e> q = p.next; // 情况1:q为空 if (q == null) { // cas操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newnode。 // 如果该cas操作成功的话,则比较“p和t”(若p不等于t,则设置newnode为新的尾节点),然后返回true。 // 如果该cas操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。 if (p.casnext(null, newnode)) { if (p != t) // hop two nodes at a time castail(t, newnode); // failure is ok. return true; } } // 情况2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情况3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
说明:offer(e e)的作用就是将元素e添加到链表的末尾。offer()比较的地方是理解for循环,下面区分3种情况对for进行分析。
情况1 -- q为空。这意味着q是尾节点的下一个节点。此时,通过p.casnext(null, newnode)将“p的下一个节点设为newnode”,若设置成功的话,则比较“p和t”(若p不等于t,则设置newnode为新的尾节点),然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。
p.casnext(null, newnode),是调用cas对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newnode”;设置成功的话,返回true,失败的话返回false。
情况2 -- p和q相等。这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。
此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。
情况3 -- 其它。
我们将p = (p != t && t != (t = tail)) ? t : q;转换成如下代码。
if (p==t) { p = q; } else { node<e> tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。
checknotnull()的源码如下:
private static void checknotnull(object v) { if (v == null) throw new nullpointerexception(); }
3. 删除
下面以poll()为例对concurrentlinkedqueue中的删除进行说明。
public e poll() { // 设置“标记” restartfromhead: for (;;) { for (node<e> h = head, p = h, q;;) { e item = p.item; // 情况1 // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话; // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。 if (item != null && p.casitem(item, null)) { if (p != h) // hop two nodes at a time updatehead(h, ((q = p.next) != null) ? q : p); return item; } // 情况2 // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。 else if ((q = p.next) == null) { updatehead(h, p); return null; } // 情况3 // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartfromhead标记重新操作。 else if (p == q) continue restartfromhead; // 情况4 // 设置p为q else p = q; } } }
说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。
情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。
p.casitem(item, null) -- 调用cas函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。
在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updatehead()更新表头;然后返回删除节点的item值。
updatehead()的源码如下:
final void updatehead(node<e> h, node<e> p) { if (h != p && cashead(h, p)) h.lazysetnext(h); }
说明:updatehead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。
cashead(h,p)是通过cas函数设置表头,若表头等于h的话,则设置表头为p。
lazysetnext()的源码如下:
void lazysetnext(node<e> val) { unsafe.putorderedobject(this, nextoffset, val); }
putorderedobject()函数,我们在前面一章“todo”中介绍过。h.lazysetnext(h)的作用是通过cas函数设置h的下一个节点为h自身,该设置可能会延迟执行。
情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。
则调用updatehead(h, p),将表头更新p;然后返回null。
情况3:p=q
在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartfromhead标记重新操作。
情况4:其它情况。
设置p=q。
concurrentlinkedqueue示例
import java.util.*; import java.util.concurrent.*; /* * concurrentlinkedqueue是“线程安全”的队列,而linkedlist是非线程安全的。 * * 下面是“多个线程同时操作并且遍历queue”的示例 * (01) 当queue是concurrentlinkedqueue对象时,程序能正常运行。 * (02) 当queue是linkedlist对象时,程序会产生concurrentmodificationexception异常。 * * */ public class concurrentlinkedqueuedemo1 { // todo: queue是linkedlist对象时,程序会出错。 //private static queue<string> queue = new linkedlist<string>(); private static queue<string> queue = new concurrentlinkedqueue<string>(); public static void main(string[] args) { // 同时启动两个线程对queue进行操作! new mythread("ta").start(); new mythread("tb").start(); } private static void printall() { string value; iterator iter = queue.iterator(); while(iter.hasnext()) { value = (string)iter.next(); system.out.print(value+", "); } system.out.println(); } private static class mythread extends thread { mythread(string name) { super(name); } @override public void run() { int i = 0; while (i++ < 6) { // “线程名” + "-" + "序号" string val = thread.currentthread().getname()+i; queue.add(val); // 通过“iterator”遍历queue。 printall(); } } } }
(某一次)运行结果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
结果说明:如果将源码中的queue改成linkedlist对象时,程序会产生concurrentmodificationexception异常。