深入浅出阻塞队列原理
一、阻塞队列定义
阻塞队列(BlockingQueue)是一种可以在多线程环境下使用,并且支持阻塞等待的队列。支持在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用
二、jdk提供的阻塞队列
-
ArrayBlockingQueue:
基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列 -
LinkedBlockingQueue:
基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE -
PriorityBlockingQueue:
以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为*阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列 -
DelayQueue:
基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个*队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞 -
SynchronousQueue:
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素 -
LinkedTransferQueue:
一个由链表结构组成的*阻塞队 -
LinkedBlockingDeque:
一个由链表结构组成的双向阻塞队列
三、特有方法
非阻塞队列方法:
-
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
-
remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
-
offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
-
poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
-
peek():获取队首元素,若成功,则返回队首元素;否则返回null
阻塞队列特有方法:
-
put(E e)
向队尾存入元素,如果队列满,则等待 -
take()
从队首取元素,如果队列为空,则等待 -
offer(E e,long timeout, TimeUnit unit)
向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true -
poll(long timeout, TimeUnit unit)
从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素
四、实现原理
采用生产消费模型
这里以ArrayBlockingQueue为例:
4.1 基本属性:
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;
// 控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
4.2 构造函数
//队列长度
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//指定队列长度和是否公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue需要指定三个构造参数:
- 队列容量,其限制了队列中最多允许的元素个数
- 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
- 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中
4.3 核心方法分析
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加可中断锁
lock.lockInterruptibly();
try {
//数组占满,等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//插入位置后移1位,到达数组总长度重新拨回到起点
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
//取数据
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//为空则等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
//出队操作
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//索引后移,达到数组长度后回到起点
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
ArrayBlockingQueue并发同步的原理是读操作和写操作都需要获取到 AQS 独占锁才能进行操作,如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
五、总结
阻塞队列帮我们封装好了生产消费模型, 我们可以很方便的使用它们,比如线程池中的阻塞队列。SynchronousQueue在ScheduledThreadPoolExecutor 中得到应用,而PriorityBlockingQueue是基于堆排序实现的,后面会详细分析