欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ArrayBlockingQueue和LinkedBlockingQueue的区别

程序员文章站 2024-03-18 11:17:58
...
  • 本篇文章首先对阻塞队列中最常用的两个队列ArrayBlockingQueue、LinkedBlockingQueue做一个大致介绍,接着介绍使用阻塞队列实现生产者-消费者模式。

ArrayBlockingQueue

  • ArrayBlockingQueue是接口BlockingQueue的阻塞实现队列之一。基于数组实现的一个阻塞队列,在创建对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。它能够实现插入和取出的阻塞方法put()和take()方法其实也是通过使用通知模式来实现。查看源码就可以知道ArrayBlockingQueue生产者方放入数据、消费者取出数据都是使用同一把重入锁,这就两者无法真正的实现生产者和消费者的并行。

LinkedBlockingQueue

  • LinkedBlockingQueue也是接口BlockingQueue的阻塞实现队列之一。基于链表实现的一个阻塞队列,在创建对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE,所以要注意一个问题,如果初始化时没有指定容量,生产者放入元素远大于消费者取出元素的速度时,那么生产的元素一直在链表中存在,这会对内存造成很大压力。由于是基于链表的,所以生产者每次放入元素会构造一个新节点对象,在大量并发的情况下可能会对系统GC造成一定影响,而ArrayBlockingQueue不存在这种情况。LinkedBlockingQueue同样是使用通知模式来实现。相对于ArrayBlockingQueue,LinkedBlockingQueue生产者和消费者分别使用两把重入锁来实现同步,所以可以提高系统的并发度。
ArrayBlockingQueue LinkedBlockingQueue
阻塞实现方式 通知模式实现 通知模式实现
有界 有界,必须指定初始化大小, 有界,构造器可以不初始化,默认为INTEGER.MAX_VALUE
插入取出元素是否会创建和销毁元素 不会
生产者消费者锁的使用情况 使用一把锁 各自使用不同的锁

目前我测试出来的情况是ArrayBlockingQueue效果更好,可能是本机的并发度并不高

/**
 * fshows.com
 * Copyright (C) 2013-2019 All Rights Reserved.
 */
package com.example.springdemo.test.queue;


import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * -verbose:gc -Xms20M -Xmx20M -XX:+PrintGCDetails
 * @author xuleyan
 * @version TestArrayBlockingQueue.java, v 0.1 2019-11-17 9:50 PM xuleyan
 */
public class TestLinkedBlockingQueue {

    private static final int number = 1000000;
    /**
     * 多线程 10000        33ms
     * 多线程 100000       165ms
     * 多线程 1000000      657ms
     * 多线程 10000000     19453ms
     */
    private static LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<>(number);


    /**
     * 多线程 10000        29ms
     * 多线程 100000       107ms
     * 多线程 1000000      651ms
     * 多线程 10000000     13473ms
     */
    private static ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(number);

    private static CountDownLatch countDownLatch = new CountDownLatch(number);
    private static CountDownLatch countTakeDownLatch = new CountDownLatch(number);

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(),
                new BlockingThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        Date start = new Date();
        for (int i = 0; i < number; i++) {
            executor.execute(new BlockingOfferTask(i));

        }

        for (int i = 0; i < number; i++) {
            executor.execute(new BlockingTakeTask());
        }
        countDownLatch.await();
        countTakeDownLatch.await();
        System.out.println("耗时:" + ((new Date()).getTime() - start.getTime()) + "ms");
        executor.shutdown();


    }

    private static class BlockingOfferTask implements Runnable {

        private Integer num;

        public BlockingOfferTask(Integer num) {
            this.num = num;
        }

        @Override
        public void run() {
            // 队列满时返回false
            arrayBlockingQueue.offer(num);
            countDownLatch.countDown();
        }
    }

    private static class BlockingTakeTask implements Runnable {

        @Override
        public void run() {
            // 从头部取值并移除,为空会等待
            try {
                Integer num = arrayBlockingQueue.take();
                countTakeDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class BlockingThreadFactory implements ThreadFactory {

        private AtomicInteger integer = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("线程" + integer.incrementAndGet());
            return thread;
        }
    }
}

————————————————

参考文章链接:https://blog.csdn.net/Hollake/article/details/90053540