Queue集合之PriorityBlockingQueue详解
集合系列文章
文章目录
前言
1、PriorityBlockingQueue是什么?
集合中*优先队列 priorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入队是如何建堆的,元素出堆是如何调整堆栈的平衡的?
PriorityBlockingQueue是带优先级的*阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。
2、查看类图接口
可以看到PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列,
状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。
3.源码解析
3.1 构造器
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
如上构造函数,默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。
接下来我们主要看PriorityBlockingQueue的几个操作的源码,如下:
3.2 offer操作
1.offer 操作,offer操作的作用是在队列插入一个元素,由于是*队列,所以一直返回true,源码如下:
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//如果当前元素个数>=队列容量,则扩容(1)
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
//默认比较器为null (2)
if (cmp == null)
siftUpComparable(n, e, array);
else
//自定义比较器 (3)
siftUpUsingComparator(n, e, array, cmp);
//队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9)
size = n + 1;
notEmpty.signal();//激活调用take()方法被阻塞的线程
} finally {
//释放独占锁
lock.unlock();
}
return true;
}
上面代码所写的offer主流程代码比较简单,下面看PriorityBlockingQueue是如何扩容和内存建立堆的,首先扩容的代码如下:
3.2.1 扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //释放获取的锁
Object[] newArray = null;
//cas成功则扩容(4)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // 如果一开始容量很小,则扩容宽度变大
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // 可能溢出
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
//第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
if (newArray == null) // 如果两外一个线程正在分配,则让出
Thread.yield();
lock.lock();//(6)
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
tryGrow目的是扩容,这里可以想下为什么扩容前要先释放锁,然后使用CAS控制只有一个线程扩容成功?
- 其实这里不释放锁也是可以的,但是在整个扩容期间都持有锁,由于扩容是需要时间的,这个时间范围内持有锁,其他线程无法进行入队和出队的操作,这样并发的效率就降低了。所以在进行扩容操作前先释放锁,使用CAS只有一个线程可以进行扩容,其他线程可以进行入队和出堆操作。
- spinlock使用CAS控制只有一个线程可以进行扩容,CAS失败的线程会调用Thread.yield()让出cpu,目的是为了让扩容后的线程优先调用lock.lock()重新获得锁,但是这得不到一定的保证。有可能yield的线程在扩容线程完成前已经退出,并执行了代码(6)获取到了锁。如果当前数组扩容还没完毕,当前线程会再次调用tryGrow方法,然后释放锁,
- 这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后又调用yield方法让出CPU。可知当扩容线程进行扩容期间,其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完成后才退出代码(1)的循环。
- 当扩容线程扩容完毕后会重置自旋锁变量allocationSpinLock 为 0,这里并没有使用UNSAFE方法的CAS进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。
- 当扩容线程扩容完毕后会执行代码(6)获取锁,获取锁后复制当前queue里面的元素到新数组。
3.2.2 建堆算法
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
//队列元素个数>0则判断插入位置,否者直接入队(7)
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;(8)
}
3.2.3 图文解释
接下来用图来解释上面的算法过程,假设队列的初始容量为2,创建的优先队列的参数类型为Integer。
1.首先调用offer(2)方法,首先获取锁然后判断当前元素的个数是否大于队列的容量,明显小于,不扩容,然后开始建堆,然后 元素个数为0,(K>0)为false,直接将元素放入到对象数组中,然后元素个数+1,最后释放锁。
2.调用offer(4),判断当前元素的个数是否大于队列的容量,明显小于,不扩容,然后开始建堆,然后 元素个数为1,(K>0)为true,进入循环,比较当前元素是否大于父节点,大于直接break,然后将元素放入到对象数组中,然后元素个数+1,最后释放锁。
3.调用offer(6), 获取lock,判断当前元素的个数大于队列的容量,进行扩容,释放lock,通过CAS将allocationSpinLock修改为1,然后判断容量小于64,于是容量变为2 + 2 + 2 = 6,重新创建了一个长度为6的容器,然后将allocationSpinLock重新赋值为0,重新获取lock,并将原来的数组元素复制到新的数组。
扩容完毕后,进入建堆函数,k>0 为true,判断当前元素大于其父节点,直接放入到元素数组中,然后元素个数+1,最后释放锁。
4.调用offer(3),获取lock,判断当前元素的个数小于队列的容量,不扩容,进入建堆函数,k>0 为true,判断当前元素小于其父节点,将父节点放入到第4个位置
然后将父节点的位置2 赋值给k,然后判断父节点2小于当前元素3,退出循环,将档前元素放入到k的位置,然后元素个数+1,最后释放锁。
可知堆的根元素是 2,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。
3.3 poll操作
poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//获取独占锁
try {
return dequeue();
} finally {
lock.unlock();//释放独占锁
}
}
如上代码可以知道在进行出队操作过程中要先加锁,这意味着,当前线程进行出队操作的时候,其他线程不能再进行入队和出队操作,但是从前面介绍offer函数的时候,知道这时候可以有其他线程进行扩容,
3.3.1 dequeue出队源码
private E dequeue() {
//队列为空,则返回null
int n = size - 1;
if (n < 0)
return null;
else {
//获取队头元素(1)
Object[] array = queue;
E result = (E) array[0];
//获取队尾元素,并值null(2)
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)//(3)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;//(4)
return result;
}
}
如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里要注意一下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。
然后代码(2)获取队列尾部元素存放到变量X,并且置空尾部节点,然后执行代码(3)插入变量X 到数组下标为 0 的位置后,重新调整堆为最大或者最小堆,然后返回。
这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆。
3.3.2 siftDownComparable堆调整源码
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // 假设左边子树最小
Object c = array[child];(5)
int right = child + 1;(6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}
先判断k < half = true ,然后进入循环,判断左子节点和右子节点的较小的元素,然后将当前父节点的元素与较小的子节点进行比较,如果小于则直接跳出循环,如果大于,则将该子节点放入到父节点的位置,然后将子节点的索引位置赋值给k,继续循环,最后将元素key赋值给array[k],堆调整结束。
总结
PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可以扩容的,当前元素个数 >= 最大容量的时候会通过算法扩容,
出队的时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的compareTo方法来做,用户可以自定义优先级的比较优先级。
本文地址:https://blog.csdn.net/b1303110335/article/details/112221278
下一篇: flask配置https