简单理解阻塞队列(BlockingQueue)中的take/put方法以及Condition存在的作用
程序员文章站
2022-03-27 12:52:45
简单理解阻塞队列(BlockingQueue)中的take/put方法以及Condition存在的作用package com.zhuyz.foundation.juc;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class MyArrayBlockingQueue<...
简单理解阻塞队列(BlockingQueue)中的take/put方法以及Condition存在的作用
-
Condition:可以理解成一把锁的一个钥匙,它既可以解锁(通知放行),又可以加锁(阻塞)
-
notFull:当队列元素满了时,阻塞生产,当队列元素存在元素但是没有满时,去通知消费
-
notEmpty:当队列中不存在元素时,阻塞消费,当队列元素存在元素时,去通知生产
while (count >= datas.length) {...}
while (count <= 0) {...}
两个while循环判断,而不用if,是因为线程不安全,
导致多线程场景下每个线程获取到的循环条件count的值存在差异,
导致代码执行异常,用while可以使当前线程重新刷新判断条件count的值。
- 用处:
ThreadPoolExecutor中使用到了阻塞队列,阻塞队列中又使用到了ReentrantLock,而ReentrantLock基于AQS。
package com.zhuyz.foundation.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyArrayBlockingQueue<T> {
// 数组元素
private T[] datas;
// 实际的元素个数(也有索引下标的作用)
private volatile int count;
private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
// 通知消费,暂停生产【当数组full时await(put时),当数组notFull时signal(take时)】
private Condition notFull = putLock.newCondition();
// 通知生产,暂停消费【当数组empty时await(take时),当数组notEmpty时signal(put时)】
private Condition notEmpty = takeLock.newCondition();
public MyArrayBlockingQueue(int cap) {
this.datas = (T[]) new Object[cap];
}
private void put(T t) {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 线程不安全,需要循环判断,插入值之前判断一下数组长度是否达到最大长度
while (count >= datas.length) {
System.out.println("datas is full, please waiting take");
notFull.await();
}
datas[count++] = t;
System.out.println("put: " + t);
} catch (Exception e) {
System.out.println("异常" + e);
} finally {
putLock.unlock();
}
// 通知获取元素的线程继续执行(take_thread)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private T take() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
T t = null;
try {
// 线程不安全,需要循环判断,插入值之前判断一下数组中元素个数是否为0
while (count <= 0) {
System.out.println("datas is empty, please waiting put");
notEmpty.await();
}
// 获取元素
t = datas[--count];
System.out.println("take: " + t);
} catch (Exception e) {
System.out.println("异常" + e);
} finally {
takeLock.unlock();
}
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 通知插入元素的线程继续执行(put_thread)
notFull.signal();
} finally {
putLock.unlock();
}
return t;
}
public static void main(String[] args) throws InterruptedException {
MyArrayBlockingQueue<Integer> myArrayBlockingQueue = new MyArrayBlockingQueue<>(5);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> myArrayBlockingQueue.put(finalI)).start();
}
TimeUnit.SECONDS.sleep(5L);
for (int i = 0; i < 5; i++) {
new Thread(() -> myArrayBlockingQueue.take()).start();
}
}
}
结果如下:
从结果中可以看出,先put了5个元素,然后另外五个元素被阻塞住了,没有进去
take消费5个之后,另外五个被阻塞的元素就被put进去了
put: 0
put: 1
put: 2
put: 3
put: 4
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
datas is full, please waiting take
take: 4
put: 5
take: 5
take: 3
put: 6
put: 7
take: 7
put: 8
take: 8
put: 9
本文地址:https://blog.csdn.net/qq_43128724/article/details/110438987