欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

BlockingQueue

程序员文章站 2022-04-21 10:32:27
...
参考内容:http://tool.oschina.net/apidocs/apidoc?api=jdk-zh

一、BlockingQueue

1.JDK-API说明
public interface BlockingQueue<E> extends Queue<E>


BlockingQueue 支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用

说明:
生产者与消费者模式,正常情况下,一个生产,一个使用,不产生积压,也不缺货
异常情况下:
生产者速率高于消费者,供大于求,临时存放货物的空间不够用,再生产的东西没地方存放,即无法继续生产;
-- 此时需要阻塞生产者,让其等待,等消费者消费,有空间存放接下来的东西
-- 即存储元素时等待空间变得可用

生产者速率低于消费者,供不应求,临时存放货物的空间空着,即使想消费,也没有货物;
-- 此时需要阻塞消费者,让其等待生产者
-- 即获取元素时等待队列非空

2.
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:
第一种是抛出一个异常,
-- 操作时不符合规则的情况下,会抛出异常
第二种是返回一个特殊值(null 或 false,具体取决于操作),
-- 根据返回值判断此次操作是否成功
第三种是在操作可以成功前,无限期地阻塞当前线程,
-- 如:生产者需等待消费者消费直至有空余空间才会继续生产
第四种是在放弃前只在给定的最大时间限制内阻塞
-- 等待指定时间后,不再等待

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用



二、ArrayBlockingQueue

1.
验证上面的方法
  • 存放数据,超过指定长度
  • 取出数据,数据为空


import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {

	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
// 接口实现,ArrayBlocking 实质为数据,即有容量大小的限制,需使用前指定
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
		
//		inPutQueue(queue);
//		outPutQueue(queue);
		
		
	}
	
	public static void outPutQueue(BlockingQueue<String> blockingQueue) throws InterruptedException {
		
        // remove 方法 -- 抛出异常
		// 此时队列为空,无数据可供取出
        // Exception in thread "main" java.util.NoSuchElementException		
//		blockingQueue.remove();
		
		// poll 方法 -- 返回值
		// 返回取出的数据
		String operationReult = blockingQueue.poll();
		System.out.println(operationReult);
		
		// take 方法 -- 阻塞
		blockingQueue.take();
		
		// poll 方法 -- 阻塞一定时间
		blockingQueue.poll(3, TimeUnit.SECONDS);
	}
	
	public static void inPutQueue(BlockingQueue<String> blockingQueue) throws InterruptedException{
		
		blockingQueue.add("a");
		blockingQueue.add("a");
		blockingQueue.add("a");
		blockingQueue.add("a");
		blockingQueue.add("a");
		
		// add 方法	-- 操作失败抛出异常	
		// 执行下面代码时异常,以存放5个,再次存放,提示队列已满 -- 类比:数组越界,超过了数组的最大容量
		// Exception in thread "main" java.lang.IllegalStateException: Queue full
//				queue.add("a");
				
		// offer 方法  -- 操作失败有特殊的返回值
		// 不同于 add 同样是队列已满,但不抛出异常,接收返回值,结果为 false -- 添加失败
//				boolean operatonResult = queue.offer("a");
//				System.out.println(operatonResult);
		
		// put 方法  -- 阻塞程序运行
		// 抛出异常
		// 运行 -- 程序无法继续走下去,无法结束,阻塞到此行
//				blockingQueue.put("a");
		
		// offer("",time,time-unit) -- 阻塞指定时间
		// 运行 -- 程序阻塞,3秒后正常结束		
//				blockingQueue.offer("a", 3L, TimeUnit.SECONDS);
	}

}




即:
异常:add / remove
返回值:offer / poll
阻塞:put / take
阻塞定时: offer / poll

2.


/**
 * 模拟消费者与生产者
 * 队列容量为5
 * 生产者 每隔1s,放进一个数据
 * 消费者 每个3s,读取一个数据
 * 若使用:add remove 则会抛出  queue full 的异常
 * 若使用:put take 	则程序正常执行,会出现短暂的阻塞  
 */
public class BlockingQueueDemo1 {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
                // 底层实现是数组,必须是有界限
		BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
	        // 底层是链表,若未设置大小,默认大小为Integer.Max_value
		// BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
		new Thread(new Producer(queue)).start();
		new Thread(new Consumer(queue)).start();
	}
	
}
class Producer implements Runnable{
	
	private BlockingQueue<String> queue ;
	
	public Producer(){
		
	}
	
	public Producer(BlockingQueue<String> queue){
		this.queue = queue ;
	}
	
	@Override
	public void run() {
		for(int i = 0 ; i< 100 ; i++){
			try {
				queue.put(String.valueOf(i));
				System.out.println("生产者--生产:"+i);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

class Consumer implements Runnable{
	
	private BlockingQueue<String> queue ;
	
	public Consumer(){
		
	}
	
	public Consumer(BlockingQueue<String> queue){
		this.queue = queue ;
	}
	
	@Override
	public void run() {
		
		for(int i = 0 ; i < 100 ; i++){
			try {
				queue.take();
				System.out.println("消费者--消费:"+i);
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
}


三、LinkedBlockingDeque

与 ArrayBlockingQueue类似
底层实现为链表,无需设置边界,默认为边界值为Integer.MaxValue

四、DelayBlockingQueue

为每个元素设置超时时间,若在超时间内未执行获取该元素的操作,则在执行该元素取出操作时进行判断,若超过超时时间了,该元素被认为无效

五、PriorityBlockingQueue

为在队列中的元素进行排序处理,即不按照FIFO的原则处理数据

六、SynchronizeBlockingQueue

同步队列,只允许存放一个元素
若队列中已有元素,存放操作需阻塞,直到元素被取出

七、BlockingDequeue

双端队列,在队列两侧均可操作