欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JAVA并发:阻塞队列

程序员文章站 2024-02-11 12:54:04
...

在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁) 等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

下面分析的是阻塞队列。

1.什么是阻塞队列

阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

  • 支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素,消费者则从队列里取元素。

2.队列Queue接口核心方法

先简单介绍下JAVA容器中的队列有哪些接口及使用方法:

JAVA并发:阻塞队列

3.阻塞队列BlockigQueue接口核心方法

阻塞队列,本质上来说还是属于队列,也就是说阻塞队列继承了队列的功能。

JAVA并发:阻塞队列

4.常用的阻塞队列

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列

  • LinkedBlockingQueue:由链表结构组成的有界阻塞队列

  • PriorityBlockingQueue:支持优先级排序的*阻塞队列

  • DelayQueue:使用优先级队列实现的*阻塞队列

  • SynchronousQueue:不存储元素的阻塞队列

  • LinkedTransferQueue:由链表结构组成的*阻塞队列

  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

以下逐一介绍这7种阻塞队列

4.1ArrayBlockingQueue(有界阻塞队列)

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下采用非公平锁的方式实现,可以通过构造器传参控制是采用公平锁还是非公平锁实现。先看看ArrayBlockingQueue类图关系:

JAVA并发:阻塞队列

4.1.1阻塞队列的使用

模拟生产者与消费者

private static void consumeDemo() throws InterruptedException {
    Thread consume = new Thread(() -> {
        while (true) {
            try {
                Integer a = consumeQueue.take();
                System.out.println("消费:" + a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    Thread product = new Thread(() -> {
        for (int i = 0; i < 100; i++) {
            try {
                consumeQueue.put(i);
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    consume.start();
    TimeUnit.SECONDS.sleep(5);
    product.start();
}

4.1.2初始化队列

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    // 初始化Object数组
    this.items = new Object[capacity];
    // 初始化锁(可以指定公平与非公平)
    lock = new ReentrantLock(fair);
    // 非空condition队列,队列空时用于阻塞获取元素的线程
    notEmpty = lock.newCondition();
    // 非满condition队列,队列满时用于阻塞添加元素的线程
    notFull =  lock.newCondition();
}

4.1.3添加元素

add、offer、put三个方法都是添加元素,但是各自的具体实现又不一样,各自的使用场景也不相同。这三个方法在添加元素的时候都会先获得Lock锁,添加成功后释放锁。这里主要介绍put方法

public void put(E e) throws InterruptedException {
    // 检查添加的元素不为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁,可异常中断
    lock.lockInterruptibly();
    try {
        // 队列满了
        while (count == items.length)
            // 阻塞继续添加元素,直到唤醒
            // 队列满了之后,当前线程添加元素会阻塞在此位置并释放锁。只有等其他线程移除元素后唤醒当前线程。才可以继续添加元素
            notFull.await();
        // 队列中元素没满,继续添加元素
        enqueue(e);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // putIndex维护当前元素添加到了哪个位置
    items[putIndex] = x;
    // 
    if (++putIndex == items.length)
        // 重新从0开始计数,前面队列满了会被阻塞,而至少移除一个元素之后才会唤醒添加元素。
        // 而移除元素也是从头开始移除的,所以不会覆盖没有消费的元素
        putIndex = 0;
    // 元素个数+1
    count++;
    // 唤醒获取元素的线程
    // 获取元素时,有可能队列为空。会阻塞再次等待
    notEmpty.signal();
}

4.1.4获取元素

remove、poll、take三个方法都是获取元素,但是各自的具体实现又不一样,各自的使用场景也不相同。这三个方法在获取元素的时候都会先获得Lock锁,获取成功后释放锁。这里主要介绍take方法

peek与element方法与上面的方法又不相同,只会读取元素而不会把他们从队列中移除。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁,可异常中断
    lock.lockInterruptibly();
    try {
        // 队列为空
        while (count == 0)
            // 阻塞当前线程获取元素
            // 当队列中添加元素时,会唤醒此线程继续去获取队列中的元素
            notEmpty.await();
        // 队列不为空,获取元素
        return dequeue();
    } finally {
        // 释放锁
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // takeIndex维护当前移除元素的位置
    E x = (E) items[takeIndex];
    // 移除元素所在的位置设置为null
    items[takeIndex] = null;
    // 
    if (++takeIndex == items.length)
        // 如果移除元素已经到队列尾部
        // 从0开始去获取元素
        takeIndex = 0;
    // 队列中的元素个数 - 1
    count--;
    // 针对使用迭代器得遍历
    if (itrs != null)
        itrs.elementDequeued();
    // 移除元素之后唤醒添加元素的线程
    notFull.signal();
    return x;
}

使用take方法去阻塞获取元素,当阻塞时使用add方法也会继续take读取,只要是添加元素就会signal

4.2 LinkedBlockingQueue(有界阻塞队列)

由链表结构组成的有界阻塞队列,遵循先进先出(FIFO)的原则,和ArrayBlockingQueue的区别是ArrayBlockingQueue内部维护的是一个数组,通过数组下标来维护队列,而LinkedBlockingQueue维护的是一个链表,通过Node来维护队列。同时有两把锁(入队和出队用不同的锁),原子个数count是使用AtomicInteger修饰。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bPO7qlYr-1606406486535)(C:/Users/scarecrow/AppData/Roaming/Typora/typora-user-images/image-20201125221854821.png)]

4.2.1 初始化队列

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue(int capacity) {
    // 容量<=0,直接异常
    if (capacity <= 0) throw new IllegalArgumentException();
    // 初始化容量
    this.capacity = capacity;
    // 初始化一个队列,head与last节点为相同的node节点
    last = head = new Node<E>(null);
}

// Node节点,含有下一个节点的引用
static class Node<E> {
    E item;
    
    Node<E> next;

    Node(E x) { item = x; }
}

JAVA并发:阻塞队列

4.2.2 添加元素

public void put(E e) throws InterruptedException {
    // 元素为空,异常
    if (e == null) throw new NullPointerException();
    int c = -1;
    // 初始化Node
    Node<E> node = new Node<E>(e);
    // 获得put锁
    final ReentrantLock putLock = this.putLock;
    // 队列的大小,AtomicInteger原子类,并发安全的。
    final AtomicInteger count = this.count;
    // 加锁
    putLock.lockInterruptibly();
    try {
        // 队列满了
        while (count.get() == capacity) {
            // 阻塞其他线程继续添加元素
            notFull.await();
        } 
        // 队列没有满,将Node加入队列
        enqueue(node);
        // 获取count的值,并同时+1
        c = count.getAndIncrement();
        // 如果队列没有满(count还是原队列的大小)c + 1因为此线程put元素加入队列成功。
        if (c + 1 < capacity)
            // 唤醒 其他阻塞添加元素 的线程
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 说明在添加元素之前,队列是空的。有可能其他线程在获取元素,此时正在被阻塞。这里需要唤醒
    if (c == 0)
        // 唤醒获取元素的线程
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    // 将添加的元素封装为node加入队列中。head指向空信息节点,不是具体添加的元素 ,但是它拥有添加元素的next引用
    last = last.next = node;
}
private void signalNotEmpty() {
    // 唤醒获取元素的线程
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

JAVA并发:阻塞队列

4.3.3获取元素

public E take() throws InterruptedException {
    E x;
    int c = -1;
    // 队列的大小
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 队列为空,获取元素的线程阻塞
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 把元素移除队列
        x = dequeue();
        // 先获得移除元素之前队列的大小,之后把值更新为 - 1
        c = count.getAndDecrement();
        // 队列中还存在元素,可以唤醒其他阻塞获取元素的线程去获得元素
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    // 队列满时,移除一个后需要唤醒加入元素的线层去添加元素
    // c == capacity 为移除元素之前的容量,移除后,队列肯定不是满的
    if (c == capacity)
        signalNotFull();
    return x;
}
// 把head节点的next节点设置为head,并返回其value值后把item设置为null
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

JAVA并发:阻塞队列

4.3LinkedBlockingDeque(有界阻塞双向队列)

LinkedBlockingDeque和LinkedBlockingQeque一样是由链表结构组成,但是LinkedBlockingDeque是个双向链表,比LinkedBlockingQeque多一个pre节点的引用。

双向队列因为多了一个操作队列的入口,所以相比较于LinkedBlockingQeque单向队列中多了addFirst、 addLast、offerFirst、offerLast、peekFirst、peekLast、putFirst、putLast、takeFirst、takeLast等方法。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst,而take方法却等同于takeFirst,使用时需要注意。

JAVA并发:阻塞队列

4.3.1初始化队列

transient Node<E> first;

transient Node<E> last;

/** Number of items in the deque */
private transient int count;

/** Maximum number of items in the deque */
private final int capacity;

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

// 初始化的时候没有设置任何节点,仅仅只是设置了一个容量
public LinkedBlockingDeque(int capacity) {
    // 初始化容量<=0,抛异常
    if (capacity <= 0) throw new IllegalArgumentException();
    // 设置队列的容量
    this.capacity = capacity;
}

static final class Node<E> {
    E item;
	// prev节点
    Node<E> prev;
	// next节点
    Node<E> next;

    Node(E x) {
        item = x;
    }
}

4.3.2添加元素

4.3.2.1 putFirst

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 队列满了,阻塞当前线程添加元素
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}
// 把当前节点,加入链表的首个位置
private boolean linkFirst(Node<E> node) {
    if (count >= capacity)
        return false;
    Node<E> f = first;
    node.next = f;
    first = node;
    if (last == null)
        last = node;
    else
        f.prev = node;
    ++count;
    notEmpty.signal();
    return true;
}

4.3.2.1 putLast

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}
private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    if (count >= capacity)
        return false;
    Node<E> l = last;
    node.prev = l;
    last = node;
    if (first == null)
        first = node;
    else
        l.next = node;
    ++count;
    notEmpty.signal();
    return true;
}

4.3.3移除元素

4.3.3.1takeFirst

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}
private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    Node<E> f = first;
    if (f == null)
        return null;
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    --count;
    notFull.signal();
    return item;
}

4.3.3.2takeLast

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}
private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    if (l == null)
        return null;
    Node<E> p = l.prev;
    E item = l.item;
    l.item = null;
    l.prev = l; // help GC
    last = p;
    if (p == null)
        first = null;
    else
        p.next = null;
    --count;
    notFull.signal();
    return item;
}

JAVA并发:阻塞队列

4.4PriorityBlockingQueue(*阻塞队列)

PriorityBlockingQueue是一个支持优先级的*阻塞队列(大小受限于内存)。和前面介绍的三种有界队列相比,*队列的最大区别是即使初始化的时候指定了长度,那么当队列元素达到上限后队列也会自动进行扩容,所以PriorityBlockingQueue在添加元素的时候不会发生阻塞,而如果扩容后的大小超过了内存限制,会抛出OutOfMemoryError错误。

默认情况下PriorityBlockingQueue队列元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者在初始化时,可以指定构造参数Comparator来对元素进行排序。
注意:PriorityBlockingQueue不能保证相同优先级元素的顺序(即两个值排序一样时,不保证顺序)。类图如下:

JAVA并发:阻塞队列

可以看到提供了4个 构造器:

  • PriorityBlockingQueue():
    初始化一个默认大小(11)长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int):
    初始化一个指定大小的长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int,Comparator):
    初始化一个指定大小的队列,并按照指定比较器进行排序。
  • PriorityBlockingQueue(Collection):
    根据传入的集合进行初始化并堆化,如果当前集合是SortedSet或者PriorityBlockingQueue类型,则保持原有顺序,否则使用自然排序进行堆化。

4.4.1初始化

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

我们看到只有一个Condition队列,这个是用来阻塞出队线程的,入队线程不会被阻塞。

接下来我们主要看看第4个构造器,是如何初始化一个队列的:

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    // true表示需要堆化即需要重排序
    boolean heapify = true; // true if not known to be in heap order
    // true表示需要筛选空值
    boolean screen = true;  // true if must screen for nulls
    // 如果集合是SortedSet类型则不需要堆化
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    }
    // 如果集合是PriorityBlockingQueue类型则不需要筛选空值
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        // 如果pq就是一个PriorityBlockingQueue则不需要堆化
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false;
    }
    // 把集合转为
    Object[] a = c.toArray();
    // 集合的长度
    int n = a.length;
    // //如果c.torray()失败,重新复制一个数组
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        // 堆化(排序)
        heapify();
}
相关标签: JAVA并发编程