欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

详解Java阻塞队列(BlockingQueue)的实现原理

程序员文章站 2023-12-16 19:46:52
阻塞队列 (blockingqueue)是java util.concurrent包下重要的数据结构,blockingqueue提供了线程安全的队列访问方式:当阻塞队列进行...

阻塞队列 (blockingqueue)是java util.concurrent包下重要的数据结构,blockingqueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于blockingqueue实现的。

blockingqueue 的操作方法

blockingqueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

详解Java阻塞队列(BlockingQueue)的实现原理

四组不同的行为方式解释:

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

无法向一个 blockingqueue 中插入 null。如果你试图插入 null,blockingqueue 将会抛出一个 nullpointerexception。

可以访问到 blockingqueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

blockingqueue 的实现类

blockingqueue 是个接口,你需要使用它的实现之一来使用blockingqueue,java.util.concurrent包下具有以下 blockingqueue 接口的实现类:

  1. arrayblockingqueue:arrayblockingqueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
  2. delayqueue:delayqueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.delayed 接口。
  3. linkedblockingqueue:linkedblockingqueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 integer.max_value 作为上限。
  4. priorityblockingqueue:priorityblockingqueue 是一个*的并发队列。它使用了和类 java.util.priorityqueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 priorityblockingqueue 的元素必须实现 java.lang.comparable 接口。因此该队列中元素的排序就取决于你自己的 comparable 实现。
  5. synchronousqueue:synchronousqueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

使用例子:

阻塞队列的最长使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便,代码如下:

package mythread;

import java.util.random;
import java.util.concurrent.blockingqueue;
import java.util.concurrent.linkedblockingqueue;
import java.util.concurrent.timeunit;

public class blockingqueuetest {
  //生产者
  public static class producer implements runnable{
    private final blockingqueue<integer> blockingqueue;
    private volatile boolean flag;
    private random random;

    public producer(blockingqueue<integer> blockingqueue) {
      this.blockingqueue = blockingqueue;
      flag=false;
      random=new random();

    }
    public void run() {
      while(!flag){
        int info=random.nextint(100);
        try {
          blockingqueue.put(info);
          system.out.println(thread.currentthread().getname()+" produce "+info);
          thread.sleep(50);
        } catch (interruptedexception e) {
          // todo auto-generated catch block
          e.printstacktrace();
        }        
      }
    }
    public void shutdown(){
      flag=true;
    }
  }
  //消费者
  public static class consumer implements runnable{
    private final blockingqueue<integer> blockingqueue;
    private volatile boolean flag;
    public consumer(blockingqueue<integer> blockingqueue) {
      this.blockingqueue = blockingqueue;
    }
    public void run() {
      while(!flag){
        int info;
        try {
          info = blockingqueue.take();
          system.out.println(thread.currentthread().getname()+" consumer "+info);
          thread.sleep(50);
        } catch (interruptedexception e) {
          // todo auto-generated catch block
          e.printstacktrace();
        }        
      }
    }
    public void shutdown(){
      flag=true;
    }
  }
  public static void main(string[] args){
    blockingqueue<integer> blockingqueue = new linkedblockingqueue<integer>(10);
    producer producer=new producer(blockingqueue);
    consumer consumer=new consumer(blockingqueue);
    //创建5个生产者,5个消费者
    for(int i=0;i<10;i++){
      if(i<5){
        new thread(producer,"producer"+i).start();
      }else{
        new thread(consumer,"consumer"+(i-5)).start();
      }
    }

    try {
      thread.sleep(1000);
    } catch (interruptedexception e) {
      // todo auto-generated catch block
      e.printstacktrace();
    }
    producer.shutdown();
    consumer.shutdown();

  }
}

阻塞队列原理:

其实阻塞队列实现阻塞同步的方式很简单,使用的就是是lock锁的多条件(condition)阻塞控制。使用blockingqueue封装了根据条件阻塞线程的过程,而我们就不用关心繁琐的await/signal操作了。

下面是jdk 1.7中arrayblockingqueue部分代码:

public arrayblockingqueue(int capacity, boolean fair) {

    if (capacity <= 0)
      throw new illegalargumentexception();
    //创建数组  
    this.items = new object[capacity];
    //创建锁和阻塞条件
    lock = new reentrantlock(fair);  
    notempty = lock.newcondition();
    notfull = lock.newcondition();
  }
//添加元素的方法
public void put(e e) throws interruptedexception {
    checknotnull(e);
    final reentrantlock lock = this.lock;
    lock.lockinterruptibly();
    try {
      while (count == items.length)
        notfull.await();
      //如果队列不满就入队
      enqueue(e);
    } finally {
      lock.unlock();
    }
  }
 //入队的方法
 private void enqueue(e x) {
    final object[] items = this.items;
    items[putindex] = x;
    if (++putindex == items.length)
      putindex = 0;
    count++;
    notempty.signal();
  }
 //移除元素的方法
 public e take() throws interruptedexception {
    final reentrantlock lock = this.lock;
    lock.lockinterruptibly();
    try {
      while (count == 0)
        notempty.await();
      return dequeue();
    } finally {
      lock.unlock();
    }
  }
 //出队的方法
 private e dequeue() {
    final object[] items = this.items;
    @suppresswarnings("unchecked")
    e x = (e) items[takeindex];
    items[takeindex] = null;
    if (++takeindex == items.length)
      takeindex = 0;
    count--;
    if (itrs != null)
      itrs.elementdequeued();
    notfull.signal();
    return x;

双端阻塞队列(blockingdeque)

concurrent包下还提供双端阻塞队列(blockingdeque),和blockingqueue是类似的,只不过blockingdeque提供从任意一端插入或者抽取元素的队列。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: