欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

深入浅出阻塞队列原理

程序员文章站 2024-01-13 18:36:34
...

一、阻塞队列定义

阻塞队列(BlockingQueue)是一种可以在多线程环境下使用,并且支持阻塞等待的队列。支持在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用

二、jdk提供的阻塞队列

  1. ArrayBlockingQueue:
    基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列

  2. LinkedBlockingQueue:
    基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE

  3. PriorityBlockingQueue:
    以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为*阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列

  4. DelayQueue:
    基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个*队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞

  5. SynchronousQueue:
    SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素

  6. LinkedTransferQueue:
    一个由链表结构组成的*阻塞队

  7. LinkedBlockingDeque:
    一个由链表结构组成的双向阻塞队列

三、特有方法

非阻塞队列方法:

  • add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

  • remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

  • offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

  • poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

  • peek():获取队首元素,若成功,则返回队首元素;否则返回null

阻塞队列特有方法:

  • put(E e)
    向队尾存入元素,如果队列满,则等待

  • take()
    从队首取元素,如果队列为空,则等待

  • offer(E e,long timeout, TimeUnit unit)
    向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true

  • poll(long timeout, TimeUnit unit)
    从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素

四、实现原理

采用生产消费模型

这里以ArrayBlockingQueue为例:

4.1 基本属性:
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
4.2 构造函数
//队列长度
public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

//指定队列长度和是否公平锁    
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

ArrayBlockingQueue需要指定三个构造参数:

  • 队列容量,其限制了队列中最多允许的元素个数
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中
4.3 核心方法分析
 public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加可中断锁
        lock.lockInterruptibly();
        try {
        	//数组占满,等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

//入队操作
private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        //插入位置后移1位,到达数组总长度重新拨回到起点
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

//取数据
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
        	//为空则等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

 //出队操作
 private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //索引后移,达到数组长度后回到起点
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

ArrayBlockingQueue并发同步的原理是读操作和写操作都需要获取到 AQS 独占锁才能进行操作,如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。

五、总结

阻塞队列帮我们封装好了生产消费模型, 我们可以很方便的使用它们,比如线程池中的阻塞队列。SynchronousQueue在ScheduledThreadPoolExecutor 中得到应用,而PriorityBlockingQueue是基于堆排序实现的,后面会详细分析

相关标签: 阻塞队列