[java队列]——LinkedBlockingQueue
程序员文章站
2022-06-17 09:49:05
...
[java队列]——LinkedBlockingQueue
回顾ArrayBlockingQueue
之前介绍过[java队列]——ArrayBlockingQueue,这里先回顾一下ArrayBlockingQueue的特点:
- 数组实现
- 线程安全,利用可重入锁控制非空和非满两个Condition进行并发控制
- 不需要扩容,数组大小固定,利用循环数组
缺点: - 队列长度在初始化时指定,并且不会自动扩容,选择容量时需谨慎
- 若队列长期为空或者为满,会导致对应的取、存线程一直处于阻塞,阻塞线程会越变越多
- 只使用一个锁来控制,并发效率低
LinkedBlockingQueue介绍
- 单链表实现
- 线程安全
- 有界
LinkedBlockingQueue内部实现
基本属性
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//链表节点
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容量
private final int capacity;
//元素个数
private final AtomicInteger count = new AtomicInteger();
//头结点
transient Node<E> head;
//尾节点
private transient Node<E> last;
//取锁
private final ReentrantLock takeLock = new ReentrantLock();
//取锁的非空条件
private final Condition notEmpty = takeLock.newCondition();
//存锁
private final ReentrantLock putLock = new ReentrantLock();
//存锁的非满条件
private final Condition notFull = putLock.newCondition();
构造方法
public LinkedBlockingQueue() {
//默认初始化最大容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
结论:
- 默认初始化最大容量 Integer.MAX_VALUE
非空非满条件通知
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//取锁
takeLock.lock();
try {
//非空通知
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
//存锁
putLock.lock();
try {
//非满通知
notFull.signal();
} finally {
putLock.unlock();
}
}
结论:
- 在非空通知前,加取锁。避免通知之前,队列元素被别的线程取走变成空队列
- 在非满通知前,加存锁。避免通知之前,队列被别的线程存入元素变成满队列
入队
入队与之前分析ArrayBlockingQueue一样,同样有四个方法,并且代码及功能类似。这里我重点讲解put(E e)方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//加存锁
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//队列满,则阻塞
notFull.await();
}
//未满则入队
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//根据前面分析,发非满信号是要加锁的。为了减少锁的次数,在出队时,只有出队之前队列是满,才会发出非满信号
//因为代码执行到这里本身就拿到锁了。所以这里就顺便检测一下,如果未满则发出心信号。。这样就不需要在每次出队都去唤醒。
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//如果存元素前,队列长度是0,就发出非空信号
signalNotEmpty();
}
结论:
- 使用putLock锁
- 如果队列满了则阻塞,否则入队
- 入队之后,如果队列仍非满,则发出未满信号,让其他等待线程可以继续入队
- 如果入队前,队列长度是0,则发出非空信号
出队
出队同样有多个方法,这里重点介绍take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加取锁
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//若队列为空,则阻塞等待
notEmpty.await();
}
//若队列非空,则出队
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//同样的道理,前面分析过,入队时,如果入队前的队列为空,才会释放非空信号。这样是为了减少加锁次数
//因此这里再出队后,判断队列非空,由于本身就持有锁,就顺便发出通知。
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
//如果出队前队列是满的,则出队后,发出非满信号
signalNotFull();
return x;
}
结论(与入队类似):
- 使用takeLock锁
- 如果队列空了则阻塞,否则出队
- 出队之后,如果队列仍非空,则发出非空信号,让其他等待线程可以继续出队
- 如果出队前,队列长度是满的,则发出非满信号
LinkedBlockingQueue结论
- 单链表实现
- 线程安全,使用存、取两把锁进行锁分离,实现入队出队互不阻塞
- 有界,可以传入容量。不传入则为最大容量
与ArrayBlockingQueue比较
队列 | 锁实现 | 是否有界 | 默认队列大小 |
---|---|---|---|
ArrayBlockingQueue | 只使用一把锁,效率较低 | 有界(可能导致线程阻塞) | 必须传入容量 |
LinkedBlockingQueue | 使用两把锁,效率较高 | 有界(可能导致线程阻塞) | 可不传,默认int的最大值(可能导致大量入队,占用大量内存) |