JUC:堵塞队列BlockingQueue和同步SynchronousQueue
程序员文章站
2022-05-04 20:55:18
...
BlockingQueue
堵塞队列的特点:
- 写入:如果队列满了,就必须堵塞等待
- 取出:如果队列是空的,必须堵塞等待生产
堵塞队列的应用场景:线程池
继承关系图:
四组API:
方式 | 抛出异常 | 有返回值,不抛出异常 | 堵塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(obj,int,TimeUnit) |
移除 | remove | poll | take | poll(int,TimeUnit) |
首个元素 | element | peek |
- 抛出异常
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(objectArrayBlockingQueue.add("a"));
System.out.println(objectArrayBlockingQueue.add("b"));
System.out.println(objectArrayBlockingQueue.add("c"));
System.out.println(objectArrayBlockingQueue.add("d"));
System.out.println(objectArrayBlockingQueue.element());
System.out.println("==================================");
System.out.println(objectArrayBlockingQueue.remove());
System.out.println(objectArrayBlockingQueue.remove());
System.out.println(objectArrayBlockingQueue.remove());
true
true
true
a
==================================
a
b
c
如果超过capacity或者取元素时队列为空,抛出异常:
Exception in thread "main" java.lang.IllegalStateException: Queue full
- 有返回值,不抛出异常
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(objectArrayBlockingQueue.offer("a"));
System.out.println(objectArrayBlockingQueue.offer("b"));
System.out.println(objectArrayBlockingQueue.offer("c"));
System.out.println(objectArrayBlockingQueue.peek());
System.out.println("==================================");
System.out.println(objectArrayBlockingQueue.poll());
System.out.println(objectArrayBlockingQueue.poll());
System.out.println(objectArrayBlockingQueue.poll());
true
true
true
a
==================================
a
b
c
当从空队列取值/超过容量:
- 堵塞等待
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
objectArrayBlockingQueue.put("a");
objectArrayBlockingQueue.put("b");
objectArrayBlockingQueue.put("c");
System.out.println("放入三个元素");
System.out.println("================");
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println("结束");
如果取出元素时队列为空/超过队列容量,程序等待:
- 超时等待
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
blockingQueue.offer("d",2,TimeUnit.SECONDS); // 等待超过2秒就退出
System.out.println("===============");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS);
SynchronousQueue
SynchronousQueue类实现了BlockingQueue接口。SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。
/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
T1 put 1
T2=>1
T1 put 2
T2=>2
T1 put 3
T2=>3