欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

源码分析-ConcurrentLinkedQueue

程序员文章站 2022-06-07 22:57:49
...

一.序言

     现在并发操作中都要求高效,都在想怎么去掉直接加锁带来的线程切换的开销,这里分享自己对concurrentLinkedQueue  的部分代码的理解,看看他无锁的原因,了解大神的设计思路。

关于 它的工作流程 参考JDK1.6 :http://ifeve.com/concurrentlinkedqueue/ 

本文分析基于JDK 1.7.0_79

 

二.源码分析

    1.介绍:concurrentlinkedqueue 设计有head  和 tail 两个节点,以及节点类 Node,主要看Node 部分

      

  

 private static class Node<E> {
        // Node 里面包含了 item 节点值  以及 下一个节点 next
        // item 和 next 都是valatile  可见性保证了
        volatile E item;
        volatile Node<E> next;

        private static final sun.misc.Unsafe UNSAFE;
        // 并且初始化的时候 就会获得item 和 next 的偏移量
        // 这为后面的cas 做了准备,如何使用继续看下面
        private static final long itemOffset;
        private static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

}

    

 

   2.从常规的 offer 方法进入,因为并发的主要也是offer 和 remove 上的竞争。

     

public boolean offer(E e) {
        checkNotNull(e);// 检查,为空直接异常
        // 创建新节点,并将e 作为节点的item
        final Node<E> newNode = new Node<E>(e);
        // 这里操作比较多,将尾节点tail 赋给变量 t,p
        for (Node<E> t = tail, p = t;;) {
            // 并获取q 也就是 tail 的下一个节点
            Node<E> q = p.next;
            // 如果下一个节点是null,说明tail 是处于尾节点上
            if (q == null) {
                // 然后用cas 将下一个节点设置成为新节点
                // 这里用cas 操作,如果多线程的情况,总会有一个先执行成功,失败的线程继续执行循环。
                // 关于casNext 的分析,跳转 1.1
                // <A> 
                if (p.casNext(null, newNode)) {
                    // 如果p.casNext有个线程成功了,p=newNode 
                    // 比较 t (tail) 是不是 最后一个节点
                    if (p != t) 
                        // 如果不等,就利用cas将,尾节点移到最后
                        // 如果失败了,那么说明有其他线程已经把tail移动过,也是OK的
                        <B>
                        casTail(t, newNode);  
                    return true;
                }
                // 如果<A>失败了,说明肯定有个线程成功了,
                // 这时候失败的线程,又会执行for 循环,再次设值,直到成功。
            }
            else if (p == q) 
                // 有可能刚好插入一个,然后P 就被删除了,那么 p==q
                // 这时候在头结点需要从新定位。
                p = (t != (t = tail)) ? t : head;
            else
                // 这里是为了当P不是尾节点的时候,将P 移到尾节点,方便下一次插入
                // 也就是一直保持向前推进
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

  

 

   1.1 casNext 分析

    

// 对应上面的 Node<E> q = p.next;p.casNext(null,newNode)
// 他是一个Node 内的方法,
boolean casNext(Node<E> cmp, Node<E> val) {
            // 可以看到,它是用p.next (null) 未偏移量,设置新值的
            // cas 是可以根据内存中的偏移量 改变值,详细这里不解释
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 既然是可以并发执行,那么当多个线程同一时间执行到这里的时候,必然只有1个 成功,后面的都失// 败。关于成功和失败的处理 继续回到上面 1 

   

    3.poll 方法解释

    

public E poll() {
        // 设置起始点
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 利用cas 将第一个节点,设置未null
                if (item != null && p.casItem(item, null)) {
                    // 和上面类似,p的next被删了,
                    // 然后然后判断一下,目的为了保证head的next不为空
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    // 有可能已经被另外线程先删除了下一个节点
                    // 那么需要先设定head 的位置,并返回null
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    // 这个一般是删完了(有点模糊)
                    continue restartFromHead;
                else
                    // 和offer 类似,这历使用保证下一个节点有值,才能删除
                    p = q;
            }
        }
    }

 

 

   4.remove 方法

    

public boolean remove(Object o) {
        if (o == null) return false;
        Node<E> pred = null;
        // 这里是从head 开始的,中间还涉及到head 的判断等从操作
        // 跟一般for 循环类似,要遍历的- -,这样的操作o 很靠后的时候,会慢- -!
        // - -不太喜欢这方法,了解作用
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null &&
                    o.equals(item) &&
                    p.casItem(item, null)) {
                Node<E> next = succ(p);
                if (pred != null && next != null)
                    pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }

 

   5.size 分析

    

public int size() {
        int count = 0;
        // 很纠结的是,这里依然用循环获取,判断节点是否有值。然后累加
        // 比较伤,可能作者是考虑offer poll 等操作开始计算,难以控制,用这种原始的方法
        // 比较伤- -,做为了解
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null){
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
            }
        return count;
    }

 

   

   6.contains 方法

     

// 这个方法 也是用线性 遍历 比较的,也不多说了  
public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

 

三.小结

    1. 这个东东,方法很多,就没一一分析了,主要是了解他的 构成和主要的代码逻辑,当然我没吃透他- -,毕竟写不出来,或者很好的该进,有问题还请指出啦,谢谢。

   2.关于这个东东的应用,可能更倾向于offer 和poll 的高并发场景,而且你会发现每次都要创建node,cas 在竞争很激烈的情况,CPU会拉高,我觉得吧 可以采用,JDK1.8 Long 原子类增加的方法,多几个cell 减少cas 的问题。

   3.像size,remove 等方法,这个我是比建议大量使用的,毕竟每个类的特点,不能照顾全部的优势,用就要用她的优势吧,其他地方的话,可以适当改写,当然~。~还是等 完全掌握了才行吧~。~

   4.这个类的精华并不是cas 的应用,我感觉是它设计上 对几个变量的应用,保证的 前后的推进 以及判断,这个是非常厉害,不得不服啊

       

 

上一篇: flink-watermark

下一篇: 求职那些事