欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java阻塞队列之ArrayBlockingQueue源码解析

程序员文章站 2024-03-18 09:11:28
...

阻塞队列概要

阻塞队列(java.util.concurrent.BlockingQueue),顾名思义,是一种特殊的队列,它的特性在于支持阻塞的添加和删除元素操作。

在JDK中其接口定义如下:

public interface BlockingQueue<E> extends Queue<E> {
	
    // 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。
    boolean add(E e);

    // 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
    boolean offer(E e);

    // 将指定元素插入此队列中,将等待可用的空间(如果有必要)。
    void put(E e) throws InterruptedException;

    // 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    E take() throws InterruptedException;

    // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
	
    // 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的附加元素数量;如果没有内部限制,则返回 Integer.MAX_VALUE。
    int remainingCapacity();

    // 从此队列中移除指定元素的单个实例(如果存在)。
    boolean remove(Object o);

    // 如果此队列包含指定元素,则返回 true。
    public boolean contains(Object o);

    // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
    int drainTo(Collection<? super E> c);

    // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
    int drainTo(Collection<? super E> c, int maxElements);
}

可见该接口是Queue接口的一个子接口,除了继承自Queue的方法,它还提供了阻塞式的插入、删除元素的方法。

BlockingQueue的实现类

在JDK中BlockingQueue有多种实现,主要有:

  • ArrayBlockingQueue:基于数组实现的BlockingQueue,它是一种有界阻塞队列。
  • LinkedBlockingQueue:基于链表实现的BlockingQueue,它也是一种有界阻塞队列。
  • PriorityBlockingQueue:一种优先级队列,它是*阻塞队列,支持传入Comparator对插入的元素进行排序。
  • ......

ArrayBlockingQueue源码解析

ArrayBlockingQueue实现概要

如前面所说,ArrayBlockingQueue是一种基于数组实现的有界阻塞队列,其构造函数定义如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 存储数据的数组。
    final Object[] items;

    // 获取数据的索引。
    int takeIndex;

    // 插入数据的索引
    int putIndex;

    // 队列元素数目。
    int count;

    // 控制并发访问的锁。
    final ReentrantLock lock;

    // 队列非空条件对象。
    private final Condition notEmpty;

    // 队列未满条件对象。
    private final Condition notFull;

    /**
     * Shared state for currently active iterators, or null if there
     * are known not to be any.  Allows queue operations to update
     * iterator state.
     */
    transient Itrs itrs = null;

    // 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    // 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    // 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }
}

从类成员上我们可以看到,ArrayBlockingQueue通过items数组来存储队列元素数据。同时通过重入锁进行并发控制。它通过takeIndex、putIndex记录当前取、插元素的位置,通过count记录队列的size。它通过notEmpty、notFull条件对象,在取数据、插数据时进行线程的阻塞和唤醒控制。

ArrayBlockingQueue方法解析

put(E e)方法

put(E e)方法将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。其实现如下:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

首先是前置校验,如果插入的元素为null则抛出NullPointerException。

接下来因为要插入数据,所以先获取锁,如果当前队列size已经等于items数组的长度,即capacity,则说明队列已满,需要等待,具体做法便是在notFull条件对象上等待,等待时释放lock,当其他线程执行了take、poll等操作时,会调用notFull的signal,该put线程便得到机会可以继续执行。

然后便是enqueue操作,这里是把数据插入数组,根据putIndex得知该插入的位置,然后插入,同时更新putIndex,队列size++,调用notEmpty条件对象的signal,告诉在notEmpty上等待的线程,队列已经不为空了。

take()方法

take()方法获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。其实现如下:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

同样,因为要对临界资源items数组进行修改,所以take方法上来也是先获取锁。

然后它判断当前size是否为0,如果为0说明队列中没有元素,需要等待,具体做法便是在notEmpty条件对象上等待,已期其他线程执行put、offer等操作时能够唤醒它。

被唤醒后当前take线程得以继续执行,执行dequeue操作,将队首元素从数组中剔除。具体做法是根据takeIndex得知要被取走的元素的数组下标,然后该位置置null以便GC线程能回收此对象。然后更新takeIndex,size--,更新迭代器中的元素数据,调用notFull的signal方法,通知在notFull上等待的线程,当前队列已经不满了,可以插入元素了。

offer(E e, long timeout, TimeUnit unit)方法

offer(E e, long timeout, TimeUnit unit)方法将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。其实现如下:

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

该方法实现与put(E e)非常类似,只不过在等待的时候调用了超时等待方法,代码结构上遵循condition.awaitNanos(long nanosTimeout)的典型编程模式。

poll(long timeout, TimeUnit unit)方法

poll(long timeout, TimeUnit unit)方法获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。其实现如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

同样,这里的实现与take()方法类似,等待时使用了超时等待方法,代码结构上遵循condition.awaitNanos(long nanosTimeout)的典型编程模式。