Java多线程(十一) Java中的阻塞队列
Java多线程(十一) Java中的阻塞队列
阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。在阻塞队列不可用时,这两个附加操作提供了4种处理方式:
方法 / 处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
抛出异常
当队列满了以后,再向队列中插入元素,会抛出 IllegalStateException (Queue Full)异常。当队列空了的话,从队列中获取元素就会抛出 NoSuchElementException 异常。
public class MyBlockQueue {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
new Thread(()->{
q.add(1);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
q.add(2);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
q.add(3);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
},"线程1").start();
new Thread(()->{
q.remove();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.remove();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.remove();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.remove();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
},"线程2").start();
}
}
返回特殊值
当向队列中插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则从队列中取出一个元素,如果没有那么返回false。
public class MyBlockQueue {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
new Thread(()->{
q.offer(1);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
q.offer(2);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
q.offer(3);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
},"线程1").start();
new Thread(()->{
Integer res = q.poll();
System.out.println(Thread.currentThread().getName()+"拿走一个元素,返回值是 "+res);
res = q.poll();
System.out.println(Thread.currentThread().getName()+"拿走一个元素,返回值是 "+res);
res = q.poll();
System.out.println(Thread.currentThread().getName()+"拿走一个元素,返回值是 "+res);
res = q.poll();
System.out.println(Thread.currentThread().getName()+"拿走一个元素,返回值是 "+res);
},"线程2").start();
}
}
一直阻塞
当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
public class MyBlockQueue {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
new Thread(()->{
try {
q.put(1);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
Thread.sleep(1000);
q.put(2);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
Thread.sleep(1000);
q.put(3);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
System.out.println("休息4秒后再插入值");
Thread.sleep(4000);
q.put(4);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程1").start();
new Thread(()->{
try {
q.take();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.take();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.take();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
q.take();
System.out.println(Thread.currentThread().getName()+"拿走一个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程2").start();
}
}
在这个例子中,线程1在插入第四个元素之前休眠了4秒钟,那么线程2一直阻塞等待线程1再次插入值。
超时退出
当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
在在这种情况的一对方法下,和offer(e,time,unit) 和 poll(time,unit) 有两个共同的参数:time和unit,time指演示等待的时间值,而unit指这个时间值的单位,这个单位是TimeUnit下的单位,有
- TimeUnit.NANOSECONDS; (纳秒)
- TimeUnit.MICROSECONDS; (微秒)
- TimeUnit.MILLISECONDS;(毫秒)
- TimeUnit.SECONDS; (秒)
- TimeUnit.MINUTES; (分)
- TimeUnit.HOURS; (时)
- TimeUnit.DAYS;(天)
public class MyBlockQueue {
public static void main(String[] args) {
ArrayBlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
new Thread(()->{
try {
q.offer(1);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
Thread.sleep(1000);
q.offer(2);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
Thread.sleep(1000);
q.offer(3);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
System.out.println("休息4秒后再插入值");
Thread.sleep(4000);
q.offer(4);
System.out.println(Thread.currentThread().getName()+"放入一个元素");
System.out.println("-----------------"+Thread.currentThread().getName()+"执行完毕,退出----------");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程1").start();
new Thread(()->{
try {
Integer res = q.poll(1000,TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName()+"拿走一个元素为 "+res);
res = q.poll(1000,TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName()+"拿走一个元素为 "+res);
res = q.poll(1000,TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName()+"拿走一个元素为 "+res);
res = q.poll(1000,TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName()+"拿走一个元素为 "+res);
System.out.println("-----------------"+Thread.currentThread().getName()+"执行完毕,退出----------");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程2").start();
}
}
在这个例子中,线程2取值时设定1秒内取不到值就退出,因此在线程1休眠的4秒钟内,线程2因为取不到值而直接退出。
Java中的阻塞队列
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
- PriorityBlockingQueue:一个支持优先级排序的*阻塞队列
- DelayQueue:一个使用优先级队列实现的秃界阻塞队列
- SynchronousQueue:一个不存储元素的阻塞队列
- LinkedTransferQueue:一个由链表结构组成的*阻塞队列
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
可以把SynchronousQueue看作是只能拿一个球的传球手,负责把生产者线程处理的数据直接传递给消费者线程。此外,它支持公平访问队列,默认情况下线程采用非公平访问策略。
public class MyQueue {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();//不存储元素,put一个元素,必须就要从里面take取出来,否则不能再put
new Thread(()->{
for (int i = 0; i < 3; i++) {
try {
queue.put(i);
System.out.println(Thread.currentThread().getName()+"放了一个元素 "+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程A").start();
new Thread(()->{
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
queue.take();
System.out.println(Thread.currentThread().getName()+"取出来了一个元素 "+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"线程B").start();
}
}
阻塞队列的实现原理
使用通知模式,所谓通知模式就是当生产者往满的队列中添加元素时会阻塞住生产者,当消费者消费一个队列中的元素后,会通知生产者当前队列可用,ArrayBlockingQueue使用了Condition来实现,关于Condition的内容可以查看这一篇博客《Java多线程(八)生产者消费者——Condition和精准唤醒》。
上一篇: 深入浅出理解海象运算符:=
下一篇: python多线程运行代码