欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JUC:堵塞队列BlockingQueue和同步SynchronousQueue

程序员文章站 2022-05-04 20:55:18
...

BlockingQueue

堵塞队列的特点:

  • 写入:如果队列满了,就必须堵塞等待
  • 取出:如果队列是空的,必须堵塞等待生产

堵塞队列的应用场景:线程池
JUC:堵塞队列BlockingQueue和同步SynchronousQueue
JUC:堵塞队列BlockingQueue和同步SynchronousQueue继承关系图

JUC:堵塞队列BlockingQueue和同步SynchronousQueue

四组API

方式 抛出异常 有返回值,不抛出异常 堵塞等待 超时等待
添加 add offer put offer(obj,int,TimeUnit)
移除 remove poll take poll(int,TimeUnit)
首个元素 element peek
  1. 抛出异常
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(objectArrayBlockingQueue.add("a"));
System.out.println(objectArrayBlockingQueue.add("b"));
System.out.println(objectArrayBlockingQueue.add("c"));
System.out.println(objectArrayBlockingQueue.add("d"));
System.out.println(objectArrayBlockingQueue.element());
System.out.println("==================================");
System.out.println(objectArrayBlockingQueue.remove());
System.out.println(objectArrayBlockingQueue.remove());
System.out.println(objectArrayBlockingQueue.remove());
true
true
true
a
==================================
a
b
c

如果超过capacity或者取元素时队列为空,抛出异常:

Exception in thread "main" java.lang.IllegalStateException: Queue full
  1. 有返回值,不抛出异常
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(objectArrayBlockingQueue.offer("a"));
System.out.println(objectArrayBlockingQueue.offer("b"));
System.out.println(objectArrayBlockingQueue.offer("c"));

System.out.println(objectArrayBlockingQueue.peek());
System.out.println("==================================");
System.out.println(objectArrayBlockingQueue.poll());
System.out.println(objectArrayBlockingQueue.poll());
System.out.println(objectArrayBlockingQueue.poll());
true
true
true
a
==================================
a
b
c

当从空队列取值/超过容量:
JUC:堵塞队列BlockingQueue和同步SynchronousQueue

  1. 堵塞等待
ArrayBlockingQueue<Object> objectArrayBlockingQueue = new ArrayBlockingQueue<>(3);
objectArrayBlockingQueue.put("a");
objectArrayBlockingQueue.put("b");
objectArrayBlockingQueue.put("c");
System.out.println("放入三个元素");
System.out.println("================");
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println(objectArrayBlockingQueue.take());
System.out.println("结束");

如果取出元素时队列为空/超过队列容量,程序等待:
JUC:堵塞队列BlockingQueue和同步SynchronousQueue

  1. 超时等待
 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 blockingQueue.offer("a");
 blockingQueue.offer("b");
 blockingQueue.offer("c");
  blockingQueue.offer("d",2,TimeUnit.SECONDS); // 等待超过2秒就退出
 System.out.println("===============");
 System.out.println(blockingQueue.poll());
 System.out.println(blockingQueue.poll());
 System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS);

SynchronousQueue

SynchronousQueue类实现了BlockingQueue接口。SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

/**
 * 同步队列
 * 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
 * put了一个元素,必须从里面先take取出来,否则不能在put进去值!
 */
public class SynchronousQueueDemo {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+" put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName()+" put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName()+" put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T1").start();
        new Thread(()->{
            try {
              TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
               TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
              TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"T2").start();
    }
}
T1 put 1
T2=>1
T1 put 2
T2=>2
T1 put 3
T2=>3