Java源码:阻塞队列(ArrayBlockingQueue)
程序员文章站
2022-04-20 23:09:37
...
一、简介
所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:
- 当队列为空时,读取该队列可以阻塞直到队列不为空;
- 当队列已满时,写入该队列可以阻塞直到队列不为满;
这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。
Java提供了下面7种阻塞队列,区别于底层数据结构的不同:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
- DelayQueue:一个使用优先级队列实现的*阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
立即返回结果值 | 超时返回结果值 | 阻塞 | 抛出异常 | |
插入 | offer(e) | offer(e,time,unit) | put(e) | add(e) |
移除 | poll() | poll(time,unit) | take() | remove() |
读取 | peek() | 无 | 无 | element() |
二、源码
-
ArrayBlockingQueue
下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{ /** 底层用于存放队列元素的数组 */ final Object[] items; /** 下一个获取索引,take,poll,peek和remove会用到 */ int takeIndex; /** 下一个插入索引,put,offer和add方法会用到 */ int putIndex; /** 当前队列中元素的个数 */ int count; /** 用于控制并发操作队列的锁对象 */ final ReentrantLock lock; /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */ private final Condition notEmpty; /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */ private final Condition notFull; }
-
写入
1. offer(e)与offer(e,time,unit)
首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //锁住队列,防止在插入过程中的并发读写 lock.lock(); try { //满了,返回false if (count == items.length) return false; else { //执行插入 insert(e); return true; } } finally { //释放锁 lock.unlock(); } }
而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); //使用lockInterruptibly锁住队列,可以被中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //如果队列不满,则执行插入并返回true if (count != items.length) { insert(e); return true; } //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false if (nanos <= 0) return false; try { //若该条件没被唤醒或者该线程没被中断,等待给定时间 //如果等待中被唤醒,返回剩余的等待时间 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { //释放锁 lock.unlock(); } }
看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
2.add(e)
add方法其实是调用了父类AbstractQueue的add方法:public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。
3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //每次被唤醒都判断一下队列是否满了,是则继续阻塞 while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
-
移除
读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
1.poll()与poll(time,unit)public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //无限循环,确保每次唤醒都要判断当前是否为空 for (; ; ) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } private E extract() { //读取队列头元素并唤醒都有等待非满线程 final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
2.take()public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //为空的情况下,无限阻塞 while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
3.remove()public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //遍历队列中的元素,使用equals方法判定相等则移除 int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
-
读取
首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //takeIndex会一直指向队列的头 return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } }
上一篇: React中ES5与ES6写法的区别总结
下一篇: Heasian与Spring集成