java阻塞队列
基本介绍:
队列是一种数据结构,它有两个基本操作:在队列尾部加入元素和从队列头部移除元素。在我们日常开发中,经常用来并发操作数据。java包中有一些应用比较广泛的特殊队列:
- 一种是以ConcurrentLinkedQueue为代表的非阻塞队列;
- 另一种是以BlockingQueue接口为代表的阻塞队列。
- 通过这两种队列,我们保证了多线程操作数据的安全性。
队列的继承图:
从类图中可以得知:java集合中的Queue继承collection接口,Dueue、PriorityQueue、BlockingQueue等类都实现了它。
阻塞队列
阻塞队列是一个支持两个附加操作的队列:
- 在队列为空时,获取元素的线程会等待队列变为非空;
- 当队列满时,存储元素的线程会等待队列可用。
- 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被堵塞,除非有另一个线程做了出队列的操作;同样,当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另外一个线程进行了入队列的操作。
常见的阻塞队列应用就是生产者消费者模式。生产者把数据放到队列,如果队列满了,就会阻塞此操作,直到消费者消费,如果队列中数据被消费完,那么消费者被阻塞,直到生产者生产。
参考上面的类图:
在java包"java.util.concurrent"中,提供六个实现了"BlockingQueue"接口的阻塞队列。分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue和LinkedBlockingDeque。实质上阻塞队列是一种特殊的FIFO数据结构,它不是立即从队列中添加或删除元素,而是等到有空间或者元素可用的时候才操作。下面分析下每种阻塞队列的实现方式和应用场景。
ArrayBlockingQueue
用数组实现的有界阻塞队列,默认情况下不保证线程公平的访问队列(按照阻塞的先后顺序访问队列),队列可用的时候,阻塞的线程都可以争夺队列的访问资格,当然也可以使用以下的构造方法创建一个公平的阻塞队列。ArrayBlockingQueue blockingQueue2 = new ArrayBlockingQueue<>(10, true)。(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。用ReentrantLock condition 实现阻塞。
有界就是队列的长度有限制,例如数组队列,在构建的时候就指定了长度。*就是可以无限地添加。
LinkedBlockingQueue
基于链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。这个队列的实现原理和ArrayBlockingQueue实现基本相同。也是采用ReentrantLock 控制并发,不同的是它使用两个独占锁来控制消费和生产。即用takeLock和putlock,这样的好处是消费者和生产者可以并发执行,对吞吐量有提升。
PriorityBlockingQueue
PriorityBlockingQueue是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是*的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。也是用ReentrantLock控制并发
DelayQueue
DelayQueue是在PriorityQueue基础上实现的,底层也是数组构造方法,是一个存放Delayed 元素的*阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就移除这个元素了。此队列不允许使用 null 元素。
SynchronousQueue
一个没有容量的队列 ,不会存储数据,每执行一次put就要执行一次take,否则就会阻塞。未使用锁。通过cas实现,吞吐量异常高。内部采用的就是ArrayBlockingQueue的阻塞队列,所以在功能上完全可以用ArrayBlockingQueue替换,但是SynchronousQueue是轻量级的,SynchronousQueue不具有任何内部容量,我们可以用来在线程间安全的交换单一元素。所以功能比较单一,优势就在于轻量。
代码示例:
public class SynchronousQueueTest {
public static void main(String[] args) {
// 该队列没有容量,必须生产一个,取一个
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " put a");
synchronousQueue.put("a");
System.out.println(Thread.currentThread().getName() + " put b");
synchronousQueue.put("b");
System.out.println(Thread.currentThread().getName() + " put b");
synchronousQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "取出:"+synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
运行结果:
t1 put a
t2取出:a
t1 put b
t2取出:b
t1 put b
t2取出:c
从结果可以看出,当添加一个元素后,如果没有take出该元素,那么就无法继续添加第2个元素。
查看源码,得知,线程会一直在该put方法里面执行for(;;)循环
LinkedBlockingDeque
LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全,当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。
阻塞队列常用方法
抛出异常组
使用add,remove
方法往队列里面添加元素的时候如果添加的元素超过了队列的大小,或者移除队列元素的时候队列为空,就会抛出异常。代码如下
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
blockingQueue.add("a");
blockingQueue.add("b");
blockingQueue.add("c");
blockingQueue.element();
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
}
}
运行结果:
Exception in thread "main" java.util.NoSuchElementException
at java.util.AbstractQueue.remove(AbstractQueue.java:117)
at com.fangyajun.javasduty.juc.queue.BlockingQueueTest.main(BlockingQueueTest.java:19)
当队列多移除元素,或者多添加元素的时候,使用该方法就出抛出异常。
特殊值组
使用offer
方法往队列里面添加元素的时候如果添加的元素返回boolean值,表示添加失败或者成功.
使用poll
移除队列元素的时候,返回移除当前的元素,当队列为空的时候,返回null.peek
方法表查看元素,不会从队列里面取出。
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println("-----使用peek查看队列头元素-----");
System.out.println(blockingQueue.peek());
System.out.println("-----使用poll移除队列元素-----");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println("-----当队列为空时使用peek查看队列头元素-----");
System.out.println(blockingQueue.peek());
}
}
运行结果:
true
true
true
false
-----使用peek查看队列头元素-----
a
-----使用poll移除队列元素-----
a
b
c
null
null
-----当队列为空时使用peek查看队列头元素-----
null
从结果可以看出,使用offer添加元素的时候,返回boolean表示插入成功或者失败,使用poll移除元素的时候返回当前移除的元素,当队列为空的时候返回null。
阻塞组
当使用put,take
添加或者移除元素的时候,添加元素的时候,如果队列已满,则线程一直阻塞直到put成功或者响应失中断退出。当队列为空的时候,消费者试图从队列里面take数据,队列会一直阻塞消费者线程直到队列可用。
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");
System.out.println("-------------------");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
}
}
运行结果是程序会一直等待,没有中断
超时退出组
当线程队列满时,往队列添加元素,地漏会阻塞生产者线程一定时间,超过限时后,生产者线程退出
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);
blockingQueue.offer("a", 2, TimeUnit.SECONDS);
blockingQueue.offer("b", 2, TimeUnit.SECONDS);
blockingQueue.offer("c", 2, TimeUnit.SECONDS);
System.out.println("队列已满插入第4个元素:");
blockingQueue.offer("a", 2, TimeUnit.SECONDS);
}
}
运行结果:当插入第4个元素的时候,线程会阻塞2秒,没有插入成功退出。
true
true
true
队列已满插入第4个元素:
false