互联网技术10——queue队列
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue为代表的阻塞队列。两种都是继承了Queue接口。
层次图(hierarchy):(idea中 层次图快捷键:选中后(或在类中空白处)) ctrl+h )
1.concurrentLinkedQueue:
concurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常concurrentLinkedQueue性能要好于BlockedQueue。它是一个基于链接节点的*线程安全队列。该队列元素遵循先进先出的原则。头是最先加入的,尾是最后加入的,该队列不允许有null元素。
ConcurrentLinkedQueue重要方法:add()、offer()、poll()、peek()
1. add()和offer()都是加入元素的方法(在concurrentLinkedQueue中,这两个方法没有任何区别,之所以有两个相同的方法,是因为这两个方法都继承自Queue,其他场景可能不一样)。
2. pool()和peek()都是取头元素,区别在于前者会删除元素,后者不会
验证ConcurrentLinkedQueue是线程安全的:
package com.company;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by BaiTianShi on 2018/8/18.
*/
public class QueueTest {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> qu = new ConcurrentLinkedQueue<>();
qu.add("a");
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
if(!qu.isEmpty()){
System.out.println("进入t1线程");
System.out.println("t1获取的元素"+qu.poll());
}
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
if(!qu.isEmpty()){
System.out.println("进入t2线程");
System.out.println("t2获取的元素"+qu.poll());
}
}
},"t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
if(!qu.isEmpty()){
System.out.println("进入t3线程");
System.out.println("t3获取的元素"+qu.poll());
}
}
},"t3");
Thread t4 = new Thread(new Runnable() {
@Override
public void run() {
if(!qu.isEmpty()){
System.out.println("进入t4线程");
System.out.println("t4获取的元素"+qu.poll());
}
}
},"t4");
Thread t5 = new Thread(new Runnable() {
@Override
public void run() {
if(!qu.isEmpty()){
System.out.println("进入t5线程");
System.out.println("t5获取的元素"+qu.poll());
}
}
},"t5");
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
}
}
运行结果:
进入t1线程
进入t3线程
进入t5线程
进入t2线程
t5获取的元素null
t3获取的元素null
t1获取的元素a
进入t4线程
t2获取的元素null
t4获取的元素null
可见看到,只有一个能拿到非空的值。其他线程拿不到a。在判断队列是否为空时,不要使用size,因为size要遍历一遍集合,使用isEmpty效率比较高,size()方法见下图。
ArrayBlockQueue
基于数组的阻塞队列实现,在ArrayBlockQueue内部维护里一个定长的数组,以便缓存队列中的数据对象,其内部没有实现读写分离,所以生产和消费不能完全并行。长度是需要定义的,并且可以指定先进先出或先进后出。它是有界队列,以内创建时必须指定长度。
方法:
添加元素 add()、put()、offer() 三个方法虽然都是添加元素,但是作用却不同。
1. put()方法,添加元素时,如果空间不够,将会一直等待,直到有元素被取出。
package com.company;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
ArrayBlockingQueue<String> st = new ArrayBlockingQueue<String>(5);
try {
for(int i=0;i<7;i++){
st.put(String.valueOf(i));
System.out.println("添加了第"+i+"个");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果,可以看见,添加到第6个时,线程一直在等待。
2. add()方法,添加元素时,如果空间不够,这抛出抛出illegalStateException异常,如果有可用空间,则添加成功时返回true。
package com.company;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
ArrayBlockingQueue<String> st = new ArrayBlockingQueue<String>(5);
for(int i=0;i<7;i++){
st.add(String.valueOf(i));
System.out.println("添加了第"+(i+1)+"个");
}
}
}
运行结果
3. offer()方法,添加元素时,有空位置则添加,并且返回ture,没有则返回fase。当向有界队列添加元素的时候,此方法优于add。
package com.company;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
ArrayBlockingQueue<String> st = new ArrayBlockingQueue<String>(5);
for(int i=0;i<7;i++){
System.out.println("添第"+(i+1)+"个时返回"+st.offer(String.valueOf(i)));
}
}
}
运行结果:
LinkedBlockingQueue
通过linkedBlockQueue认识drainTo(List,n); 从队列中取出头部n个元素放到list中;
package com.company;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
LinkedBlockingQueue<String> st = new LinkedBlockingQueue<String>(5);
for(int i=0;i<4;i++){
st.offer(String.valueOf(i));
}
List<String> li = new ArrayList<>();
st.drainTo(li,2);
for(String l : li){
System.out.println("取出的"+l);
}
for(int j=0;j<5; j++){
System.out.println("队列中剩余"+String.valueOf(st.poll()));
}
}
}
运行结果:可见取出0和1放到了list当中,
取出的0
取出的1
队列中剩余2
队列中剩余3
队列中剩余null
队列中剩余null
队列中剩余null
同时我们注意到,在这里我们虽然循环了十次,但只放进去0123这四个元素,因为这里指定了队列长度,这样它就是一个有界队列
当我们把这个长度去掉时
package com.company;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
LinkedBlockingQueue<String> st = new LinkedBlockingQueue<String>();
for(int i=0;i<10;i++){
st.offer(String.valueOf(i));
}
System.out.println(st.size());
}
}
运行结果:可见,虽然我们没指定长度,但是循环10次每次都放进去了。改成100甚至1000都可以,这里不做过多演示。
SynchronouseQueue
下面看个例子,这个例子,貌似SynchronousQueue可以添加元素,如下所示。但是其实SynchronousQueue依然是没有存储元素的,这里之所以没有报错,是因为我们先启动了一个线程t1要消费SynchronousQueue这个队列中的元素,线程t2要向SynchronousQueue队列添加一个元素,这时候会发生什么呢?这时候,线程t2并不会真的把元素添加到队列中,而是直接将要添加的元素交给线程t1了。也就是说,SynchronousQueue队列还是不会真正存储元素的。
package com.company;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
final SynchronousQueue<String> st = new SynchronousQueue<String>();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(st.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
st.put("0");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t2.start();
}
}
运行结果
肯定有些人会有疑问,既然SynchronousQueue不能装任何元素的话,那么要它有何用?还有就是有界队列和*队列的应用场景是什么呢?如下图所示。
PriorityBlockingQueue
基于优先级的阻塞队列(优先级判断通过构造函数传入的Compatot对象来决定,也就是说传入队列的对象必须实现ComParable接口)。JDK中PriorityBlockingQueue内部控制线程同步的锁采用的是公平锁,它是一个*队列。
代码: task,实现了Comparable的compar方法
package com.company;
/**
* Created by BaiTianShi on 2018/8/20.
*/
public class Task implements Comparable<Task> {
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public int compareTo(Task Task) {
return this.id > Task.id ? 1 : (this.id < Task.id ? -1 : 0);
}
}
main方法:
package com.company;
import java.util.Iterator;
import java.util.concurrent.PriorityBlockingQueue;
/**
* Created by BaiTianShi on 2018/8/19.
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
final PriorityBlockingQueue<Task> st = new PriorityBlockingQueue<Task>();
Task ta4 = new Task();
ta4.setId(4);
ta4.setName("t4");
Task ta1 = new Task();
ta1.setId(1);
ta4.setName("t1");
Task ta3 = new Task();
ta3.setId(3);
ta4.setName("t3");
Task ta2 = new Task();
ta2.setId(2);
ta4.setName("t2");
st.add(ta3);
st.add(ta2);
st.add(ta4);
st.add(ta1);
for(Iterator<Task> iterator = st.iterator(); iterator.hasNext();){
System.out.println(iterator.next().getId());
}
Thread.sleep(3000);
System.out.println(st.poll().getId());
System.out.println(st.poll().getId());
System.out.println(st.poll().getId());
System.out.println(st.poll().getId());
}
}
可见,通过iterator遍历时是无效的,但是在取的时候是有序的。这样做的目的是为了避免每次放入元素时都要重新计算排序, 第一次取得时候进行排序,取走一个元素对顺序无任何影响。
DelayQueue
带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素,DelayQueue中的元素必须实现Delay接口。Delay中的元素必须实现Delay接口,Delay是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接处理的关闭等等。
这里使用DelayQueue模拟一下网吧上网的上机和下机过程,
网民实体类,实现Delayed接口
package com.company;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* Created by BaiTianShi on 2018/8/20.
*/
public class WangMin implements Delayed {
private Integer id;
private String name;
private long endTime;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
public WangMin(Integer id, String name, long endTime) {
this.id = id;
this.name = name;
this.endTime = endTime;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed de) {
WangMin w = (WangMin)de;
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
}
}
模拟网吧开始营业及网面来上网的过程代码:
package com.company;
import java.util.concurrent.DelayQueue;
/**
* Created by BaiTianShi on 2018/8/20.
*/
public class WangBa implements Runnable {
//延时队列
private DelayQueue<WangMin> queue = new DelayQueue();
//是否营业
private boolean open = true;
public WangBa(boolean open) {
this.open = open;
}
//上机方法
//第三个参数是下机时间,上网时长加上当前时间就是下机时间
public void playStart(String name, Integer id, Integer money){
WangMin peo = new WangMin(id,name,1000*money+System.currentTimeMillis());
System.out.println("身份证"+id +"的用户"+name+"已上机");
queue.add(peo);
}
public void playEnd(WangMin wangMin){
System.out.println("身份证"+wangMin.getId() +"的用户"+wangMin.getName()+"上机时间到,请您下机");
}
@Override
public void run() {
try {
while(open){
WangMin wan = queue.take();
playEnd(wan);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//开始营业
final WangBa wangBa = new WangBa(true);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
wangBa.run();
}
});
t1.start();
wangBa.playStart("uzi",1,1);
wangBa.playStart("goGoing",2,3);
wangBa.playStart("farker",3,5);
wangBa.playStart("clearLove",4,2);
}
}
运行结果:
身份证1的用户uzi已上机
身份证2的用户goGoing已上机
身份证3的用户farker已上机
身份证4的用户clearLove已上机
身份证1的用户uzi上机时间到,请您下机
身份证4的用户clearLove上机时间到,请您下机
身份证2的用户goGoing上机时间到,请您下机
身份证3的用户farker上机时间到,请您下机
从运行状态上看,虽然全部下机了,但是线程仍未结束,根据实际情况,可通过设置while(open)中的open将轮询线程结束,例如使用volatitle修饰open
下一篇: 互联网技术02——脏读