java 队列
本文参考文章链接:
https://www.cnblogs.com/lemon-flm/p/7877898.html
Queue:一个队列就是一个先入先出(FIFO)的数据结构
Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。
队列分为阻塞队列和非阻塞队列
非阻塞队列有PriorityQueue和ConcurrentLinkedQueue
PriorityQueue:维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。从队列里取元素时,会按排的序取出元素,而不是先进先出的默认结构。
ConcurrentLinkedQueue:是基于链接节点的、线程安全的队列。并发访问不需要同步。ConcurrentLinkedQueue 收集关于队列大小的信息会很慢,需要遍历队列。需要高并发访问并且不要求计算队列大小的,可以使用ConcurrentLinkedQueue这个队列。
阻塞队列实现了BlockingQueue这个接口,下面有五个实现类,分别为:
ArrayBlockingQueue :一个由数组支持的有界队列。
LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
PriorityBlockingQueue :一个由优先级堆支持的*优先级队列。
DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
关于ArrayBlockingQueue和LinkedBlockingQueue的一点区别
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,而LinkedBlockingQueue之所以能够高效的处理并发数据,
还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。
这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别,所以你一般自己测试的时候,可能ArrayBlockingQueue的并发性能可能比LinkedBlockingQueue还高点。
队列里的基本方法:
add 增加一个元索 如果队列已满,则抛出一个IIIegaISlabEepeplian异常
remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常
offer 添加一个元素并返回true 如果队列已满,则返回false
poll 移除并返问队列头部的元素 如果队列为空,则返回null
peek 返回队列头部的元素 如果队列为空,则返回null
put 添加一个元素 如果队列满,则阻塞
take 移除并返回队列头部的元素 如果队列为空,则阻塞
队列的类结构:
测试代码演示:
public class QueueFactory<T> {
private Queue<T> queue = null;
/**
* ConcurrentLinkedQueue --- 非阻塞队列,
* ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。
* 从队列的尾部添加元素,从头部删除元素。
* 可以多个生产者线程和多个消费者线程同时操作队列,因为队列是线程安全的。
* ConcurrentLinkedQueue 收集队列大小的信息会很慢,需要遍历队列。
* 对不需要获取队列大小并且多线程操作的队列可以使用ConcurrentLinkedQueue
*
* @return
*/
public ConcurrentLinkedQueue<T> createConcurrentLinkedQueueInstance() {
queue = new ConcurrentLinkedQueue<>();
return (ConcurrentLinkedQueue<T>) queue;
}
/**
* 优先队列PriorityQueue --- 非阻塞队列
* 维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位
* @return
*/
public PriorityQueue<T> createPriorityQueueInstance() {
queue = new PriorityQueue<>(new Comparator<T>() {
@Override
public int compare(T o1, T o2) {
String s1 = o1.toString();
String s2 = o2.toString();
return s1.compareTo(s2);
}
});
return (PriorityQueue<T>)queue;
}
/**
* 阻塞队列ArrayBlockingQueue,底层由数组支持
* ArrayBlockingQueue是一个有界队列,队列为空时取元素会阻塞,队列为满时放元素会阻塞。
*
* @param queueSize 队列大小
* @return
*/
public ArrayBlockingQueue<T> createArrayBlockingQueueInstance(int queueSize) {
queue = new ArrayBlockingQueue<>(queueSize);
return (ArrayBlockingQueue<T>)queue;
}
/**
* 阻塞队列LinkedBlockingQueue,底层由链表支持
* LinkedBlockingQueue每次往队列里新增和删除元素时会生产和销毁一个Node对象,在长时间高并发处理大量数据时会对GC有一定的影响
* @return
*/
public LinkedBlockingQueue<T> createLinkedBlockingQueueInstance() {
queue = new LinkedBlockingQueue<>();
return (LinkedBlockingQueue<T>)queue;
}
/**
*
* @return
*/
public <M extends Delayed> DelayQueue<M> createDepayQueueInstance() {
Queue<M> delayQueue = new DelayQueue<>();
return (DelayQueue<M>)delayQueue;
}
}
/**
* 生产者线程
* 负责把任务加入到队列中
*
* @param <T>
*/
public class ProduceThread<T> implements Runnable {
private Queue<T> queue;
private boolean isBlockingQueue;
private T t;
private BlockingQueue<T> blockingQueue;
public ProduceThread(Queue<T> queue, boolean isBlockQueue, T t) {
super();
this.queue = queue;
this.isBlockingQueue = isBlockQueue;
this.t = t;
if(isBlockQueue) {
blockingQueue = (BlockingQueue<T>)queue;
}
}
@Override
public void run() {
while(true) {
System.out.println("threadname = " + Thread.currentThread().getName() + " 生产任务开始");
try {
if(isBlockingQueue) {
blockingQueue.put(t);
} else {
queue.offer(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("threadname = " + Thread.currentThread().getName() + " 生产 " + t.toString() + " 结束");
if(!(queue instanceof ConcurrentLinkedQueue)) {
System.out.println("threadname = " + Thread.currentThread().getName() + " 队列长度为 " + queue.size());
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消费者线程
* 负责从队列里取出任务消费
*
* @param <T>
*/
public class ConsumeThread<T> implements Runnable {
private Queue<T> queue;
private boolean isBlockingQueue;
private BlockingQueue<T> blockingQueue;
public ConsumeThread(Queue<T> queue, boolean isBlockingQueue) {
super();
this.queue = queue;
this.isBlockingQueue = isBlockingQueue;
if(isBlockingQueue) {
blockingQueue = (BlockingQueue<T>)queue;
}
}
@Override
public void run() {
while(true) {
try {
System.out.println("threadname = " + Thread.currentThread().getName() + " 消费任务开始");
T t = null;
if(isBlockingQueue) {
t = blockingQueue.take();
if(t instanceof DelayTask) {
DelayTask d = (DelayTask)t;
System.out.println("threadname = " + Thread.currentThread().getName()
+ DateUtil.dateTimeToString(new Date(), DateUtil.pattern2) + " content = "
+ d.getContent());
}
} else {
t = queue.poll();
}
System.out.println("threadname = " + Thread.currentThread().getName() + " 消费 t = " + t + " 结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
if(!(queue instanceof ConcurrentLinkedQueue)) {
System.out.println("threadname = " + Thread.currentThread().getName() + " 队列长度为 " + queue.size());
}
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class DelayTask implements Delayed {
private long delayTime;
private String content;
private long removeTime;
public DelayTask(long delayTime, String content) {
super();
this.delayTime = delayTime;
this.content = content;
this.removeTime = TimeUnit.MILLISECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();
}
public long getDelayTime() {
return delayTime;
}
public void setDelayTime(long delayTime) {
this.delayTime = delayTime;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public long getRemoveTime() {
return removeTime;
}
public void setRemoveTime(long removeTime) {
this.removeTime = removeTime;
}
@Override
public int compareTo(Delayed o) {
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return diff > 0 ? 1 : (diff == 0 ? 0 : -1);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.currentTimeMillis(), unit);
}
}
public class QueueDemoTest {
private static QueueFactory<String> factory = new QueueFactory<>();
private static ExecutorService pool = ThreadPoolFactory.createThreadPoolInstance();
/**
* 测试ConcurrentLinkedQueue
* 多线程操作下安全的队列
*/
@Test
public void testConcurrentLinkedQueue() {
ConcurrentLinkedQueue<String> queue = factory.createConcurrentLinkedQueueInstance();
ProduceThread<String> produce = new ProduceThread<String>(queue, false, "hello concurrentLinkedQueue");
ConsumeThread<String> consume = new ConsumeThread<>(queue, false);
pool.execute(produce);
pool.execute(produce);
pool.execute(consume);
pool.execute(consume);
pool.execute(consume);
pool.execute(consume);
pool.execute(consume);
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
/**
* 测试优先队列PriorityQueue,
* 添加到队列里的元素会按照天然顺序或构造函数的排序规则进行排序
*/
@Test
public void testPriorityQueue() {
PriorityQueue<String> queue = factory.createPriorityQueueInstance();
queue.offer("baobao");
queue.offer("kezhi");
queue.offer("haoqilai");
queue.offer("shuihaojiao");
queue.offer("yuping");
ConsumeThread<String> consume = new ConsumeThread<>(queue, false);
pool.execute(consume);
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
/**
* 测试阻塞队列ArrayBlockingQueue,因为生产任务比消费任务快些,
* 所以队列满后,生产的线程是会阻塞的,等待消费的线程去消费掉队列里的消息
*/
@Test
public void testArrayBlockingQueue() {
ArrayBlockingQueue<String> queue = factory.createArrayBlockingQueueInstance(5);
ProduceThread<String> produce = new ProduceThread<String>(queue, true, "Hello ArrayBlockingQueue");
ConsumeThread<String> consume = new ConsumeThread<>(queue, true);
pool.execute(produce);
pool.execute(produce);
pool.execute(consume);
pool.execute(consume);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
/**
* 测试阻塞队列LinkedBlockingQueue
*/
@Test
public void testLinkedBlockingQueue() {
LinkedBlockingQueue<String> queue = factory.createLinkedBlockingQueueInstance();
ProduceThread<String> produce = new ProduceThread<>(queue, true, "Hello LinkedBlockingQueue");
ConsumeThread<String> consume = new ConsumeThread<>(queue, true);
for(int i = 0; i < 2; i++) {
pool.execute(produce);
}
for(int i = 0; i < 8; i++) {
pool.execute(consume);
}
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
@Test
public void testDelayQueue() {
System.out.println(DateUtil.dateTimeToString(new Date(), DateUtil.pattern2));
DelayQueue<DelayTask> queue = factory.createDepayQueueInstance();
DelayTask d1 = new DelayTask(5000, "5秒到了任务执行");
DelayTask d2 = new DelayTask(10000, "10秒到了任务执行");
DelayTask d3 = new DelayTask(15000, "15秒到了任务执行");
DelayTask d4 = new DelayTask(20000, "20秒到了任务执行");
queue.put(d1);
queue.put(d2);
queue.put(d3);
queue.put(d4);
ConsumeThread<DelayTask> consume = new ConsumeThread<>(queue, true);
pool.execute(consume);
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}
}
/**
* 自定义线程池工厂
*
*/
public class ThreadPoolFactory {
private static ExecutorService pool = null;
/**
* 创建自定义线程池
* @return
*/
public static ExecutorService createThreadPoolInstance() {
if (pool == null) {
pool = new ThreadPoolExecutor(20, 50, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(200),
new ThreadRejectedHandler());
}
return pool;
}
}
/**
* 这种拒绝策略是不抛出异常也不丢弃新任务,只是加任务时处于阻塞状态
*
*/
class ThreadRejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}