欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Queue集合之PriorityBlockingQueue详解

程序员文章站 2022-06-24 10:28:50
集合系列文章文章目录集合系列文章前言1、PriorityBlockingQueue是什么?2、查看类图接口3.源码解析3.1 构造器3.2 offer操作3.2.1 扩容3.2.2 建堆算法3.2.3 图文解释3.3 poll操作3.3.1 dequeue出队源码3.3.2 siftDownComparable堆调整源码总结前言1、PriorityBlockingQueue是什么?集合中*优先队列 priorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入...

集合系列文章


前言

1、PriorityBlockingQueue是什么?

集合中*优先队列 priorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入队是如何建堆的,元素出堆是如何调整堆栈的平衡的?

PriorityBlockingQueue是带优先级的*阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。

2、查看类图接口

Queue集合之PriorityBlockingQueue详解

可以看到PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列,
状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。

3.源码解析

3.1 构造器

    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

如上构造函数,默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。

接下来我们主要看PriorityBlockingQueue的几个操作的源码,如下:

3.2 offer操作

1.offer 操作,offer操作的作用是在队列插入一个元素,由于是*队列,所以一直返回true,源码如下:

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();

    //获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();

    int n, cap;
    Object[] array;

    //如果当前元素个数>=队列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

    try {
        Comparator<? super E> cmp = comparator;

        //默认比较器为null (2)
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //自定义比较器 (3)
            siftUpUsingComparator(n, e, array, cmp);

        //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9)
        size = n + 1;
        notEmpty.signal();//激活调用take()方法被阻塞的线程
    } finally {
        //释放独占锁
        lock.unlock();
    }
    return true;
}

上面代码所写的offer主流程代码比较简单,下面看PriorityBlockingQueue是如何扩容和内存建立堆的,首先扩容的代码如下:

3.2.1 扩容

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //释放获取的锁
    Object[] newArray = null;

    //cas成功则扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // 如果一开始容量很小,则扩容宽度变大
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // 可能溢出
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
    if (newArray == null) // 如果两外一个线程正在分配,则让出
        Thread.yield();
    lock.lock();//(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow目的是扩容,这里可以想下为什么扩容前要先释放锁,然后使用CAS控制只有一个线程扩容成功?

  1. 其实这里不释放锁也是可以的,但是在整个扩容期间都持有锁,由于扩容是需要时间的,这个时间范围内持有锁,其他线程无法进行入队和出队的操作,这样并发的效率就降低了。所以在进行扩容操作前先释放锁,使用CAS只有一个线程可以进行扩容,其他线程可以进行入队和出堆操作。
  2. spinlock使用CAS控制只有一个线程可以进行扩容,CAS失败的线程会调用Thread.yield()让出cpu,目的是为了让扩容后的线程优先调用lock.lock()重新获得锁,但是这得不到一定的保证。有可能yield的线程在扩容线程完成前已经退出,并执行了代码(6)获取到了锁。如果当前数组扩容还没完毕,当前线程会再次调用tryGrow方法,然后释放锁,
  3. 这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后又调用yield方法让出CPU。可知当扩容线程进行扩容期间,其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完成后才退出代码(1)的循环。
  4. 当扩容线程扩容完毕后会重置自旋锁变量allocationSpinLock 为 0,这里并没有使用UNSAFE方法的CAS进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。
  5. 当扩容线程扩容完毕后会执行代码(6)获取锁,获取锁后复制当前queue里面的元素到新数组。

3.2.2 建堆算法

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //队列元素个数>0则判断插入位置,否者直接入队(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

3.2.3 图文解释

接下来用图来解释上面的算法过程,假设队列的初始容量为2,创建的优先队列的参数类型为Integer。
Queue集合之PriorityBlockingQueue详解
1.首先调用offer(2)方法,首先获取锁然后判断当前元素的个数是否大于队列的容量,明显小于,不扩容,然后开始建堆,然后 元素个数为0,(K>0)为false,直接将元素放入到对象数组中,然后元素个数+1,最后释放锁。
Queue集合之PriorityBlockingQueue详解
2.调用offer(4),判断当前元素的个数是否大于队列的容量,明显小于,不扩容,然后开始建堆,然后 元素个数为1,(K>0)为true,进入循环,比较当前元素是否大于父节点,大于直接break,然后将元素放入到对象数组中,然后元素个数+1,最后释放锁。
Queue集合之PriorityBlockingQueue详解
3.调用offer(6), 获取lock,判断当前元素的个数大于队列的容量,进行扩容,释放lock,通过CAS将allocationSpinLock修改为1,然后判断容量小于64,于是容量变为2 + 2 + 2 = 6,重新创建了一个长度为6的容器,然后将allocationSpinLock重新赋值为0,重新获取lock,并将原来的数组元素复制到新的数组。
Queue集合之PriorityBlockingQueue详解
扩容完毕后,进入建堆函数,k>0 为true,判断当前元素大于其父节点,直接放入到元素数组中,然后元素个数+1,最后释放锁。
Queue集合之PriorityBlockingQueue详解
4.调用offer(3),获取lock,判断当前元素的个数小于队列的容量,不扩容,进入建堆函数,k>0 为true,判断当前元素小于其父节点,将父节点放入到第4个位置
Queue集合之PriorityBlockingQueue详解
然后将父节点的位置2 赋值给k,然后判断父节点2小于当前元素3,退出循环,将档前元素放入到k的位置,然后元素个数+1,最后释放锁。
Queue集合之PriorityBlockingQueue详解
Queue集合之PriorityBlockingQueue详解

可知堆的根元素是 2,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。

3.3 poll操作

poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();//获取独占锁
    try {
        return dequeue();
    } finally {
        lock.unlock();//释放独占锁
    }
}

如上代码可以知道在进行出队操作过程中要先加锁,这意味着,当前线程进行出队操作的时候,其他线程不能再进行入队和出队操作,但是从前面介绍offer函数的时候,知道这时候可以有其他线程进行扩容,

3.3.1 dequeue出队源码

private E dequeue() {

    //队列为空,则返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {

        //获取队头元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //获取队尾元素,并值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;//(4)
        return result;
    }
}

如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里要注意一下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。

然后代码(2)获取队列尾部元素存放到变量X,并且置空尾部节点,然后执行代码(3)插入变量X 到数组下标为 0 的位置后,重新调整堆为最大或者最小堆,然后返回。

这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆。

3.3.2 siftDownComparable堆调整源码

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // 假设左边子树最小
                Object c = array[child];5int right = child + 1;6)
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)(8)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;(9)
        }
    }

先判断k < half = true ,然后进入循环,判断左子节点和右子节点的较小的元素,然后将当前父节点的元素与较小的子节点进行比较,如果小于则直接跳出循环,如果大于,则将该子节点放入到父节点的位置,然后将子节点的索引位置赋值给k,继续循环,最后将元素key赋值给array[k],堆调整结束。

总结

PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可以扩容的,当前元素个数 >= 最大容量的时候会通过算法扩容,

出队的时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的compareTo方法来做,用户可以自定义优先级的比较优先级。

本文地址:https://blog.csdn.net/b1303110335/article/details/112221278