欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

JUC——阻塞队列

程序员文章站 2022-07-26 12:48:56
Queue是一个队列,而队列的主要特征是FIFO先进先出,要实现生产者与消费者模型,也可以采用队列来进行中间的缓冲读取,好处是:生产者可以一直不停歇的生产数据。 BlockingQueue是Queue的子类,它实现有队列的基本特征: 在最初利用Queue实现生产者与消费者模型的时候发现一个问题:所有 ......

Queue是一个队列,而队列的主要特征是FIFO先进先出,要实现生产者与消费者模型,也可以采用队列来进行中间的缓冲读取,好处是:生产者可以一直不停歇的生产数据。

BlockingQueue是Queue的子类,它实现有队列的基本特征:

public interface BlockingQueue<E> extends Queue<E>

在最初利用Queue实现生产者与消费者模型的时候发现一个问题:所有的消费者可能不是一个轮流操作,而是有可能某一个消费者长期进行消费处理。

 

阻塞队列

BlockingQueue通常用于一个线程生成对象,而另外一个线程消费这些对象的场景。

JUC——阻塞队列

  一个线程将会持续生成新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点,也就是说,它是有限的。如果该阻塞队列到达了临界点,负责生产的线程将会再往里面插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。

 

BlockingQueue也是一个处理接口, 如果要想操作BlockingQueue也需要使用它的一系列子类:

JUC——阻塞队列

对于阻塞队列而言最基础的两个实现子类:数组的队列,链表的队列。

 

范例:使用BlockingQueue实现一个消费者与生产者模型

package so.strong.mall.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueueDemo {
    public static void main(String[] args) {
        final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);//允许保存5个数据队列
        for (int i = 0; i < 3; i++) { //3个生产者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 5; j++) {
                        try {
                            TimeUnit.SECONDS.sleep(j);
                            String str = "[生产数据{" + Thread.currentThread().getName() + "}] j = " + j;
                            queue.put(str); //会进入到生产的阻塞状态
                            System.out.println(str);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "生产者-" + i).start();
        }

        for (int i = 0; i < 5; i++) {  //5个消费者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            TimeUnit.SECONDS.sleep(2);
                            if (queue.isEmpty()) { //队里内容为空
                                break;
                            }
                            System.out.println("[消费数据{" + Thread.currentThread().getName() + "}]" + queue.take());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "消费者-" + i).start();

        }
    }
}

除了数组之外也可以使用链表来进行操作:LinkedBlockingQueue。在使用这个类进行BlockingQueue接口对象实例化的时候,如果没有设置容量,则容量为:Integer.MAX_VALUE。

 

范例:修改为链表实现消费者与生产者模型

final BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);//允许保存5个数据队列

链表是通过索引在进行弹出数据的,而链表只需要弹出第一个元素即可。

 

范例:采用优先级的PriorityBlockingQueue来实现数据操作

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

PriorityBlockingQueue利用了Comparable接口来进行处理完成。

final BlockingQueue<String> queue = new PriorityBlockingQueue<>(5);//允许保存5个数据队列

对于使用哪一种具体的子类完全是由具体的开发环境来决定的。需要至少知道BlockingQueue这个阻塞队列核心就是提供有同步队列的功能。

 

SychronousQueue同步队列

之前使用的BlockingQueue每一次都可以保存多个数据对象信息,但是有些时候只能够允许保存一个数据的信息,这种情况下就要使用SychronousQueue子类来完成、

范例:使用同步队列来进行处理

package so.strong.mall.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class QueueDemo {
    public static void main(String[] args) {
        final BlockingQueue<String> queue = new SynchronousQueue<>();//允许保存5个数据队列
        for (int i = 0; i < 3; i++) { //3个生产者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 5; j++) {
                        try {
                            TimeUnit.SECONDS.sleep(j);
                            String str = "[生产数据{" + Thread.currentThread().getName() + "}] j = " + j;
                            queue.put(str); //会进入到生产的阻塞状态
                            System.out.println(str);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "生产者-" + i).start();
        }

        for (int i = 0; i < 5; i++) {  //5个消费者
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            TimeUnit.SECONDS.sleep(2);
                            System.out.println("[消费数据{" + Thread.currentThread().getName() + "}]" + queue.take());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "消费者-" + i).start();
        }
    }
}

现在不关心有多少个消费者与生产者,都采用一个接一个的形式执行。

 

BlockingDeque双端阻塞队列

BlockingQueue主要特征是只能够从一方面获取数据,也就是说它按照一个队列的形式采用了FIFO处理完成。但是现在希望可以按照前后各自处理,那么就需要BlocingDeque。

一个BlockingDeque线程在双端队列的两端都可以插入数据和提取数据。

一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移除了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

 JUC——阻塞队列

BlockingDeque接口结构:

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E>

子类LinkedBlockingDeque类结构

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>,  java.io.Serializable

 

范例:观察双端队列的基本使用

package so.strong.mall.concurrent;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
 * @author Termis
 * @date 2018/5/21
 */
public class QueueDemo {
    public static void main(String[] args) {
        final BlockingDeque<String> deque = new LinkedBlockingDeque<>();
        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 5; j++) {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            String str;
                            if (j % 2 == 0) {
                                str = "[FIRST]生产数据{" + Thread.currentThread().getName() + "}y=" + j;
                                deque.addFirst(str);
                            } else {
                                str = "[LAST]生产数据{" + Thread.currentThread().getName() + "}y=" + j;
                                deque.addLast(str);
                            }
                            System.out.println(str);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "生产者-" + i).start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println("[First]消费数据{" + Thread.currentThread().getName() + "}" + deque.takeFirst());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "消费者First").start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println("[Last]消费数据{" + Thread.currentThread().getName() + "}" + deque.takeLast());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "消费者Last").start();

    }
}

生产者和消费者模型的实现方案很多。对于双端队列一定清楚它本身还是一个队列。如果现在first已经拽干净了,那么就继续拽last,就会有可能出现first消费last的情况。

 

public class QueueDemo {
    public static void main(String[] args) throws Exception {
        BlockingDeque<String> deque = new LinkedBlockingDeque<>();
        deque.addFirst("hello-first");
        deque.addFirst("world-first");
        deque.addLast("hello-last");
        deque.addLast("world-last");
        System.out.println(deque.takeFirst());
        System.out.println(deque.takeFirst());
        System.out.println(deque.takeFirst());
        System.out.println(deque.takeFirst());
    }
}
world-first
hello-first
hello-last
world-last

如果一端出现了阻塞,那么至少另外一端可以继续使用。