JAVA并发:阻塞队列
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁) 等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
下面分析的是阻塞队列。
1.什么是阻塞队列
阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
-
支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
-
支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素,消费者则从队列里取元素。
2.队列Queue接口核心方法
先简单介绍下JAVA容器中的队列有哪些接口及使用方法:
3.阻塞队列BlockigQueue接口核心方法
阻塞队列,本质上来说还是属于队列,也就是说阻塞队列继承了队列的功能。
4.常用的阻塞队列
-
ArrayBlockingQueue:由数组结构组成的有界阻塞队列
-
LinkedBlockingQueue:由链表结构组成的有界阻塞队列
-
PriorityBlockingQueue:支持优先级排序的*阻塞队列
-
DelayQueue:使用优先级队列实现的*阻塞队列
-
SynchronousQueue:不存储元素的阻塞队列
-
LinkedTransferQueue:由链表结构组成的*阻塞队列
-
LinkedBlockingDeque:由链表结构组成的双向阻塞队列
以下逐一介绍这7种阻塞队列
4.1ArrayBlockingQueue(有界阻塞队列)
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下采用非公平锁的方式实现,可以通过构造器传参控制是采用公平锁还是非公平锁实现。先看看ArrayBlockingQueue类图关系:
4.1.1阻塞队列的使用
模拟生产者与消费者
private static void consumeDemo() throws InterruptedException {
Thread consume = new Thread(() -> {
while (true) {
try {
Integer a = consumeQueue.take();
System.out.println("消费:" + a);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread product = new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
consumeQueue.put(i);
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
consume.start();
TimeUnit.SECONDS.sleep(5);
product.start();
}
4.1.2初始化队列
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化Object数组
this.items = new Object[capacity];
// 初始化锁(可以指定公平与非公平)
lock = new ReentrantLock(fair);
// 非空condition队列,队列空时用于阻塞获取元素的线程
notEmpty = lock.newCondition();
// 非满condition队列,队列满时用于阻塞添加元素的线程
notFull = lock.newCondition();
}
4.1.3添加元素
add、offer、put三个方法都是添加元素,但是各自的具体实现又不一样,各自的使用场景也不相同。这三个方法在添加元素的时候都会先获得Lock锁
,添加成功后释放锁
。这里主要介绍put
方法
public void put(E e) throws InterruptedException {
// 检查添加的元素不为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁,可异常中断
lock.lockInterruptibly();
try {
// 队列满了
while (count == items.length)
// 阻塞继续添加元素,直到唤醒
// 队列满了之后,当前线程添加元素会阻塞在此位置并释放锁。只有等其他线程移除元素后唤醒当前线程。才可以继续添加元素
notFull.await();
// 队列中元素没满,继续添加元素
enqueue(e);
} finally {
// 释放锁
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// putIndex维护当前元素添加到了哪个位置
items[putIndex] = x;
//
if (++putIndex == items.length)
// 重新从0开始计数,前面队列满了会被阻塞,而至少移除一个元素之后才会唤醒添加元素。
// 而移除元素也是从头开始移除的,所以不会覆盖没有消费的元素
putIndex = 0;
// 元素个数+1
count++;
// 唤醒获取元素的线程
// 获取元素时,有可能队列为空。会阻塞再次等待
notEmpty.signal();
}
4.1.4获取元素
remove、poll、take三个方法都是获取元素,但是各自的具体实现又不一样,各自的使用场景也不相同。这三个方法在获取元素的时候都会先获得Lock锁
,获取成功后释放锁
。这里主要介绍take
方法
peek与element方法与上面的方法又不相同,只会读取元素而不会把他们从队列中移除。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁,可异常中断
lock.lockInterruptibly();
try {
// 队列为空
while (count == 0)
// 阻塞当前线程获取元素
// 当队列中添加元素时,会唤醒此线程继续去获取队列中的元素
notEmpty.await();
// 队列不为空,获取元素
return dequeue();
} finally {
// 释放锁
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// takeIndex维护当前移除元素的位置
E x = (E) items[takeIndex];
// 移除元素所在的位置设置为null
items[takeIndex] = null;
//
if (++takeIndex == items.length)
// 如果移除元素已经到队列尾部
// 从0开始去获取元素
takeIndex = 0;
// 队列中的元素个数 - 1
count--;
// 针对使用迭代器得遍历
if (itrs != null)
itrs.elementDequeued();
// 移除元素之后唤醒添加元素的线程
notFull.signal();
return x;
}
使用take方法去阻塞获取元素,当阻塞时使用add方法也会继续take读取,只要是添加元素就会signal
4.2 LinkedBlockingQueue(有界阻塞队列)
由链表结构组成的有界阻塞队列,遵循先进先出(FIFO)的原则,和ArrayBlockingQueue的区别是ArrayBlockingQueue内部维护的是一个数组,通过数组下标来维护队列,而LinkedBlockingQueue维护的是一个链表,通过Node来维护队列。同时有两把锁(入队和出队用不同的锁),原子个数count是使用AtomicInteger修饰。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bPO7qlYr-1606406486535)(C:/Users/scarecrow/AppData/Roaming/Typora/typora-user-images/image-20201125221854821.png)]
4.2.1 初始化队列
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue(int capacity) {
// 容量<=0,直接异常
if (capacity <= 0) throw new IllegalArgumentException();
// 初始化容量
this.capacity = capacity;
// 初始化一个队列,head与last节点为相同的node节点
last = head = new Node<E>(null);
}
// Node节点,含有下一个节点的引用
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
4.2.2 添加元素
public void put(E e) throws InterruptedException {
// 元素为空,异常
if (e == null) throw new NullPointerException();
int c = -1;
// 初始化Node
Node<E> node = new Node<E>(e);
// 获得put锁
final ReentrantLock putLock = this.putLock;
// 队列的大小,AtomicInteger原子类,并发安全的。
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 队列满了
while (count.get() == capacity) {
// 阻塞其他线程继续添加元素
notFull.await();
}
// 队列没有满,将Node加入队列
enqueue(node);
// 获取count的值,并同时+1
c = count.getAndIncrement();
// 如果队列没有满(count还是原队列的大小)c + 1因为此线程put元素加入队列成功。
if (c + 1 < capacity)
// 唤醒 其他阻塞添加元素 的线程
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 说明在添加元素之前,队列是空的。有可能其他线程在获取元素,此时正在被阻塞。这里需要唤醒
if (c == 0)
// 唤醒获取元素的线程
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// 将添加的元素封装为node加入队列中。head指向空信息节点,不是具体添加的元素 ,但是它拥有添加元素的next引用
last = last.next = node;
}
private void signalNotEmpty() {
// 唤醒获取元素的线程
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
4.3.3获取元素
public E take() throws InterruptedException {
E x;
int c = -1;
// 队列的大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 队列为空,获取元素的线程阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 把元素移除队列
x = dequeue();
// 先获得移除元素之前队列的大小,之后把值更新为 - 1
c = count.getAndDecrement();
// 队列中还存在元素,可以唤醒其他阻塞获取元素的线程去获得元素
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 队列满时,移除一个后需要唤醒加入元素的线层去添加元素
// c == capacity 为移除元素之前的容量,移除后,队列肯定不是满的
if (c == capacity)
signalNotFull();
return x;
}
// 把head节点的next节点设置为head,并返回其value值后把item设置为null
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
4.3LinkedBlockingDeque(有界阻塞双向队列)
LinkedBlockingDeque和LinkedBlockingQeque一样是由链表结构组成,但是LinkedBlockingDeque是个双向链表,比LinkedBlockingQeque多一个pre节点的引用。
双向队列因为多了一个操作队列的入口,所以相比较于LinkedBlockingQeque单向队列中多了addFirst、 addLast、offerFirst、offerLast、peekFirst、peekLast、putFirst、putLast、takeFirst、takeLast等方法。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst,而take方法却等同于takeFirst,使用时需要注意。
4.3.1初始化队列
transient Node<E> first;
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
// 初始化的时候没有设置任何节点,仅仅只是设置了一个容量
public LinkedBlockingDeque(int capacity) {
// 初始化容量<=0,抛异常
if (capacity <= 0) throw new IllegalArgumentException();
// 设置队列的容量
this.capacity = capacity;
}
static final class Node<E> {
E item;
// prev节点
Node<E> prev;
// next节点
Node<E> next;
Node(E x) {
item = x;
}
}
4.3.2添加元素
4.3.2.1 putFirst
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列满了,阻塞当前线程添加元素
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
// 把当前节点,加入链表的首个位置
private boolean linkFirst(Node<E> node) {
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
4.3.2.1 putLast
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
4.3.3移除元素
4.3.3.1takeFirst
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
4.3.3.2takeLast
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
4.4PriorityBlockingQueue(*阻塞队列)
PriorityBlockingQueue是一个支持优先级的*阻塞队列(大小受限于内存)。和前面介绍的三种有界队列相比,*队列的最大区别是即使初始化的时候指定了长度,那么当队列元素达到上限后队列也会自动进行扩容,所以PriorityBlockingQueue在添加元素的时候不会发生阻塞,而如果扩容后的大小超过了内存限制,会抛出OutOfMemoryError错误。
默认情况下PriorityBlockingQueue队列元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者在初始化时,可以指定构造参数Comparator来对元素进行排序。
注意:PriorityBlockingQueue不能保证相同优先级元素的顺序(即两个值排序一样时,不保证顺序)。类图如下:
可以看到提供了4个 构造器:
- PriorityBlockingQueue():
初始化一个默认大小(11)长度的队列,并使用默认自然排序。 - PriorityBlockingQueue(int):
初始化一个指定大小的长度的队列,并使用默认自然排序。 - PriorityBlockingQueue(int,Comparator):
初始化一个指定大小的队列,并按照指定比较器进行排序。 - PriorityBlockingQueue(Collection):
根据传入的集合进行初始化并堆化,如果当前集合是SortedSet或者PriorityBlockingQueue类型,则保持原有顺序,否则使用自然排序进行堆化。
4.4.1初始化
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
我们看到只有一个Condition队列,这个是用来阻塞出队线程的,入队线程不会被阻塞。
接下来我们主要看看第4个构造器,是如何初始化一个队列的:
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// true表示需要堆化即需要重排序
boolean heapify = true; // true if not known to be in heap order
// true表示需要筛选空值
boolean screen = true; // true if must screen for nulls
// 如果集合是SortedSet类型则不需要堆化
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
// 如果集合是PriorityBlockingQueue类型则不需要筛选空值
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
// 如果pq就是一个PriorityBlockingQueue则不需要堆化
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
// 把集合转为
Object[] a = c.toArray();
// 集合的长度
int n = a.length;
// //如果c.torray()失败,重新复制一个数组
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
// 堆化(排序)
heapify();
}