欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java源码:阻塞队列(ArrayBlockingQueue)

程序员文章站 2022-04-20 23:09:37
...

一、简介

所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:

  • 当队列为空时,读取该队列可以阻塞直到队列不为空;
  • 当队列已满时,写入该队列可以阻塞直到队列不为满;

这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。


Java提供了下面7种阻塞队列,区别于底层数据结构的不同:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
  • DelayQueue:一个使用优先级队列实现的*阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的接口是:java.util.concurrent.BlockingQueue,主要提供了以下存取方法(根据队列空或者满时的响应方式,可分成3类):
  立即返回结果值 超时返回结果值 阻塞 抛出异常
插入 offer(e) offer(e,time,unit) put(e) add(e)
移除 poll() poll(time,unit) take() remove()
读取 peek() element()

二、源码

  • ArrayBlockingQueue

    下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:
        public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
                implements BlockingQueue<E>, java.io.Serializable{  
            /** 底层用于存放队列元素的数组 */  
            final Object[] items;  
          
            /** 下一个获取索引,take,poll,peek和remove会用到 */  
            int takeIndex;  
          
            /** 下一个插入索引,put,offer和add方法会用到 */  
            int putIndex;  
          
            /** 当前队列中元素的个数 */  
            int count;  
          
            /** 用于控制并发操作队列的锁对象 */  
            final ReentrantLock lock;  
            /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */  
            private final Condition notEmpty;  
            /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */  
            private final Condition notFull;  
        }  
  • 写入

    1. offer(e)与offer(e,time,unit)
    首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。
        public boolean offer(E e) {  
            if (e == null) throw new NullPointerException();  
            final ReentrantLock lock = this.lock;  
            //锁住队列,防止在插入过程中的并发读写  
            lock.lock();  
            try {  
                //满了,返回false  
                if (count == items.length)  
                    return false;  
                else {  
                    //执行插入  
                    insert(e);  
                    return true;  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:
        public boolean offer(E e, long timeout, TimeUnit unit)  
                throws InterruptedException {  
          
            if (e == null) throw new NullPointerException();  
                long nanos = unit.toNanos(timeout);  
            //使用lockInterruptibly锁住队列,可以被中断  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                for (;;) {  
                    //如果队列不满,则执行插入并返回true  
                    if (count != items.length) {  
                        insert(e);  
                        return true;  
                    }  
          
                    //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false  
                    if (nanos <= 0)  
                        return false;  
                    try {  
                        //若该条件没被唤醒或者该线程没被中断,等待给定时间  
                        //如果等待中被唤醒,返回剩余的等待时间  
                        nanos = notFull.awaitNanos(nanos);  
                    } catch (InterruptedException ie) {  
                        notFull.signal(); // propagate to non-interrupted thread  
                        throw ie;  
                    }  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒
        private void insert(E x) {  
            items[putIndex] = x;  
            putIndex = inc(putIndex);  
            ++count;  
            notEmpty.signal();  
        }  

    2.add(e)
    add方法其实是调用了父类AbstractQueue的add方法:
        public boolean add(E e) {  
            if (offer(e))  
                return true;  
            else  
                throw new IllegalStateException("Queue full");  
        }  
    很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。

    3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:
        public void put(E e) throws InterruptedException {  
            if (e == null) throw new NullPointerException();  
            final E[] items = this.items;  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                try {  
                    //每次被唤醒都判断一下队列是否满了,是则继续阻塞  
                    while (count == items.length)  
                        notFull.await();  
                } catch (InterruptedException ie) {  
                    notFull.signal(); // propagate to non-interrupted thread  
                    throw ie;  
                }  
                insert(e);  
            } finally {  
                lock.unlock();  
            }  
        }  
  • 移除

    读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
    1.poll()与poll(time,unit)
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == 0)
                    return null;
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //无限循环,确保每次唤醒都要判断当前是否为空
                for (; ; ) {
                    if (count != 0) {
                        E x = extract();
                        return x;
                    }
                    if (nanos <= 0)
                        return null;
                    try {
                        nanos = notEmpty.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        notEmpty.signal(); // propagate to non-interrupted thread
                        throw ie;
                    }
    
                }
            } finally {
                lock.unlock();
            }
        }
    
        private E extract() {
            //读取队列头元素并唤醒都有等待非满线程
            final E[] items = this.items;
            E x = items[takeIndex];
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }

    2.take()
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    //为空的情况下,无限阻塞
                    while (count == 0)
                        notEmpty.await();
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }

    3.remove()
        public boolean remove(Object o) {
            if (o == null) return false;
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //遍历队列中的元素,使用equals方法判定相等则移除
                int i = takeIndex;
                int k = 0;
                for (;;) {
                    if (k++ >= count)
                        return false;
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    i = inc(i);
                }
    
            } finally {
                lock.unlock();
            }
        }
  • 读取

    首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:
        public E element() {
            E x = peek();
            if (x != null)
                return x;
            else
                throw new NoSuchElementException();
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //takeIndex会一直指向队列的头
                return (count == 0) ? null : items[takeIndex];
            } finally {
                lock.unlock();
            }
        }
以上就是我对ArrayBlockingQueue的理解,水平有限,恳请指教。