Java队列之ArrayBlockingQueue源码解析
程序员文章站
2024-03-18 08:41:34
...
目录
1.ArrayBlockingQueue
1.1整体架构
- 有界的阻塞队列,容量一旦创建,就无法进行修改
- 队列满时,往队列中 put 数据会被阻塞,队列空时,往队列中拿数据也会被阻塞。
// 队列存放在 object 的数组里面
// 数组大小必须在初始化的时候手动设置,没有默认大小
final Object[] items;
// 队头索引
int takeIndex;
// 队尾索引的下一个位置
int putIndex;
// 当前已有元素数量
int count;
// 可重入的锁
final ReentrantLock lock;
// take的队列
private final Condition notEmpty;
// put的队列
private final Condition notFull;
-
底层数据结构使用循环队列来进行完成(进行运算时需要对count进行取余操作)
1.2初始化源码解析
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
// 队列不为空 Condition,在 put 成功时使用
notEmpty = lock.newCondition();
// 队列不满 Condition,在 take 成功时使用
notFull = lock.newCondition();
}
可重入锁实现了公平,如果是公平锁,那么在锁竞争(获取锁)时,就会按照先来先到的顺序,如果是非公平锁,锁竞争时随机的。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//设置初始化容量
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ps:如果指定集合元素进行初始化,如果集合元素数大于设置的capacity会报错!
1.3新增数据源码解析
// 新增,如果队列满,无限阻塞
public void put(E e) throws InterruptedException {
// 元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 可中断锁
try {
// 队列如果是满的,就无限等待,需要take线程进行唤醒
while (count == items.length)
notFull.await();
//元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
// 放入元素,putIndex指向队尾元素的下一个位置
items[putIndex] = x;
// 更新putIndex的位置,并且判断是否需要取余(直接赋值为0优化了取余)
if (++putIndex == items.length)
putIndex = 0;
// 更新元素数量
count++;
// 唤醒因为队列空而等待的线程
notEmpty.signal();
}
获取可中断锁,如果队列满那么就无限阻塞直到有take线程进行唤醒,如果队列不满那么就入队(直接在putIndex元素位置放入元素,然后优化了取余操作并且累加了putIndex非常优雅,然后更新队列数量,最后put线程会唤醒队列为空而阻塞的take线程)
ps:在这里,判断队列满没有浪费一个位置的空间,通过object数组的length属性可以很好地判断;
1.4拿数据源码解析
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();// 获取中断锁
try {
// 如果队列为空,无限等待,直到被put线程进行唤醒
while (count == 0)
notEmpty.await();
// 从队列中拿数据
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
// 获取元素
E x = (E) items[takeIndex];
// 帮助 gc
items[takeIndex] = null;
// ++ takeIndex 计算下次拿数据的位置,优化了取余操作
if (++takeIndex == items.length)
takeIndex = 0;
// 队列实际大小减 1
count--;
// 唤醒由于队列满所阻塞的put线程
notFull.signal();
return x;
}
take方法:获取可中断锁,如果队列为空就阻塞直到put线程进行唤醒,然后就进行出队操作,在释放锁
dequeue方法:获取到takeIndex队头元素,然后设置为null,将指针累加并且判断是否需要进行取余,在更新队列的元素数量,会唤醒由于队列满所阻塞的put线程
1.5删除数据
// 一共有两种情况:
// 1:删除位置和 takeIndex 一样
// 2:把要删除的元素右边的元素全部左移一位
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 情况1 如果删除位置正好等于下次要拿数据的位置
if (removeIndex == takeIndex) {
// 下次要拿数据的位置直接置空
items[takeIndex] = null;
// 要拿数据的位置往后移动一位
if (++takeIndex == items.length)
takeIndex = 0;
// 当前数组的大小减一
count--;
// 情况 2
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
// 找到要删除元素的下一个
int next = i + 1;
// 要注意需要判断next
if (next == items.length)
next = 0;
// 下一个元素不是 putIndex
if (next != putIndex) {
// 下一个元素往前移动一位
items[i] = items[next];
i = next;
// 下一个元素是 putIndex
} else {
// 删除元素
items[i] = null;
// 下次放元素时,应该从本次删除的元素放
this.putIndex = i;
break;
}
}
count--;
}
// 唤醒由于队列满而阻塞的put线程
notFull.signal();
}
如果删除的位置等于takeIndex(删除takeIndex位置的元素在后移一位),
如果删除的位置不相同从删除的位置开始将后半部分的元素全部向前移动一位(如果next元素为putIndex就将当前指针指向的元素删除然后设置为putIndex指针)
推荐阅读
-
Java队列之ArrayBlockingQueue源码解析
-
Java优先队列源码解析
-
《易道客》源码剖析之四:页面多次跳转的记忆 博客分类: 易道客 java易道客页面跳转开源框架
-
mybatis源码解析之拦截器 博客分类: mybatis mybatis代理
-
mybatis源码解析之mapper接口代理 博客分类: mybatis mybatis
-
mybatis源码解析之拦截器 博客分类: mybatis mybatis代理
-
mybatis源码解析之mapper接口代理 博客分类: mybatis mybatis
-
java数据结构与算法之双向循环队列的数组实现方法
-
java POI解析Excel 之数据转换公用方法(推荐)
-
java数据结构与算法之双向循环队列的数组实现方法