欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java阻塞队列

程序员文章站 2024-02-11 13:15:22
...

基本介绍:

队列是一种数据结构,它有两个基本操作:在队列尾部加入元素和从队列头部移除元素。在我们日常开发中,经常用来并发操作数据。java包中有一些应用比较广泛的特殊队列:

  • 一种是以ConcurrentLinkedQueue为代表的非阻塞队列;
  • 另一种是以BlockingQueue接口为代表的阻塞队列。
  • 通过这两种队列,我们保证了多线程操作数据的安全性。

队列的继承图:
java阻塞队列

从类图中可以得知:java集合中的Queue继承collection接口,Dueue、PriorityQueue、BlockingQueue等类都实现了它。

阻塞队列

阻塞队列是一个支持两个附加操作的队列:

  1. 在队列为空时,获取元素的线程会等待队列变为非空
  2. 当队列满时,存储元素的线程会等待队列可用
  3. 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被堵塞,除非有另一个线程做了出队列的操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另外一个线程进行了入队列的操作。

常见的阻塞队列应用就是生产者消费者模式。生产者把数据放到队列,如果队列满了,就会阻塞此操作,直到消费者消费,如果队列中数据被消费完,那么消费者被阻塞,直到生产者生产。

参考上面的类图:
在java包"java.util.concurrent"中,提供六个实现了"BlockingQueue"接口的阻塞队列。分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue和LinkedBlockingDeque。实质上阻塞队列是一种特殊的FIFO数据结构,它不是立即从队列中添加或删除元素,而是等到有空间或者元素可用的时候才操作。下面分析下每种阻塞队列的实现方式和应用场景。

ArrayBlockingQueue

用数组实现的有界阻塞队列,默认情况下不保证线程公平的访问队列(按照阻塞的先后顺序访问队列),队列可用的时候,阻塞的线程都可以争夺队列的访问资格,当然也可以使用以下的构造方法创建一个公平的阻塞队列。ArrayBlockingQueue blockingQueue2 = new ArrayBlockingQueue<>(10, true)。(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。用ReentrantLock condition 实现阻塞。

有界就是队列的长度有限制,例如数组队列,在构建的时候就指定了长度。*就是可以无限地添加。

LinkedBlockingQueue

基于链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。这个队列的实现原理和ArrayBlockingQueue实现基本相同。也是采用ReentrantLock 控制并发,不同的是它使用两个独占锁来控制消费和生产。即用takeLock和putlock,这样的好处是消费者和生产者可以并发执行,对吞吐量有提升。

PriorityBlockingQueue

PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是*的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。也是用ReentrantLock控制并发

DelayQueue

DelayQueue是在PriorityQueue基础上实现的,底层也是数组构造方法,是一个存放Delayed 元素的*阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就移除这个元素了。此队列不允许使用 null 元素。

SynchronousQueue

一个没有容量的队列 ,不会存储数据,每执行一次put就要执行一次take,否则就会阻塞。未使用锁。通过cas实现,吞吐量异常高。内部采用的就是ArrayBlockingQueue的阻塞队列,所以在功能上完全可以用ArrayBlockingQueue替换,但是SynchronousQueue是轻量级的,SynchronousQueue不具有任何内部容量,我们可以用来在线程间安全的交换单一元素。所以功能比较单一,优势就在于轻量。
代码示例:

public class SynchronousQueueTest {
    public static void main(String[] args) {
        // 该队列没有容量,必须生产一个,取一个
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + " put a");
                    synchronousQueue.put("a");
                    System.out.println(Thread.currentThread().getName() + " put b");
                    synchronousQueue.put("b");
                    System.out.println(Thread.currentThread().getName() + " put b");
                    synchronousQueue.put("c");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t2").start();
    }
}

运行结果:

t1 put a
t2取出:a
t1 put b
t2取出:b
t1 put b
t2取出:c

从结果可以看出,当添加一个元素后,如果没有take出该元素,那么就无法继续添加第2个元素。

查看源码,得知,线程会一直在该put方法里面执行for(;;)循环

LinkedBlockingDeque

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全,当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

阻塞队列常用方法

java阻塞队列
抛出异常组
使用add,remove方法往队列里面添加元素的时候如果添加的元素超过了队列的大小,或者移除队列元素的时候队列为空,就会抛出异常。代码如下

public class BlockingQueueTest {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        blockingQueue.add("a");
        blockingQueue.add("b");
        blockingQueue.add("c");

        blockingQueue.element();
        
        blockingQueue.remove();
        blockingQueue.remove();
        blockingQueue.remove();
        blockingQueue.remove();
    }
}

运行结果:

Exception in thread "main" java.util.NoSuchElementException
	at java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at com.fangyajun.javasduty.juc.queue.BlockingQueueTest.main(BlockingQueueTest.java:19)

当队列多移除元素,或者多添加元素的时候,使用该方法就出抛出异常。

特殊值组
使用offer方法往队列里面添加元素的时候如果添加的元素返回boolean值,表示添加失败或者成功.
使用poll移除队列元素的时候,返回移除当前的元素,当队列为空的时候,返回null.
peek方法表查看元素,不会从队列里面取出。

   public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d"));

        System.out.println("-----使用peek查看队列头元素-----");
        System.out.println(blockingQueue.peek());
        System.out.println("-----使用poll移除队列元素-----");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());

        System.out.println("-----当队列为空时使用peek查看队列头元素-----");
        System.out.println(blockingQueue.peek());
    }
}

运行结果:

true
true
true
false
-----使用peek查看队列头元素-----
a
-----使用poll移除队列元素-----
a
b
c
null
null
-----当队列为空时使用peek查看队列头元素-----
null

从结果可以看出,使用offer添加元素的时候,返回boolean表示插入成功或者失败,使用poll移除元素的时候返回当前移除的元素,当队列为空的时候返回null。

阻塞组
当使用put,take添加或者移除元素的时候,添加元素的时候,如果队列已满,则线程一直阻塞直到put成功或者响应失中断退出。当队列为空的时候,消费者试图从队列里面take数据,队列会一直阻塞消费者线程直到队列可用。

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        blockingQueue.put("d");
        System.out.println("-------------------");
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();
    }
}

运行结果是程序会一直等待,没有中断

超时退出组
当线程队列满时,往队列添加元素,地漏会阻塞生产者线程一定时间,超过限时后,生产者线程退出

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
        blockingQueue.offer("a", 2, TimeUnit.SECONDS);
        blockingQueue.offer("b", 2, TimeUnit.SECONDS);
        blockingQueue.offer("c", 2, TimeUnit.SECONDS);
        System.out.println("队列已满插入第4个元素:");
        blockingQueue.offer("a", 2, TimeUnit.SECONDS);
    }
}

运行结果:当插入第4个元素的时候,线程会阻塞2秒,没有插入成功退出。

true
true
true
队列已满插入第4个元素:
false
相关标签: JUC并发