玩转高并发系列----锁(三)
程序员文章站
2022-04-15 19:04:01
这一章我们将着重讲解Condition的基本用法以及其实现原理,让读者不光知其然,更知其所以然。并且将带领大家一行一行通过源码的方式剖析其实现细节。基本用法Condition本身是一个接口,其功能和用法类似wait()和notify()。并且在使用Condition时,必须和Lock一起使用,即必须获取到Condition对应的lock锁之后,才能调用Condition的阻塞(await)和唤醒(signal)方法。先看一下Condition的接口声明。public interface Condi...
这一章我们将着重讲解Condition的基本用法以及其实现原理,让读者不光知其然,更知其所以然。并且将带领大家一行一行通过源码的方式剖析其实现细节。
基本用法
- Condition本身是一个接口,其功能和用法类似wait()和notify()。并且在使用Condition时,必须和Lock一起使用,即必须获取到Condition对应的lock锁之后,才能调用Condition的阻塞(await)和唤醒(signal)方法。先看一下Condition的接口声明。
public interface Condition {
// 阻塞当前线程,同时释放锁,等待被signal方法唤醒
void await() throws InterruptedException;
// 阻塞当前线程,并等待指定的ns,超时返回
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 唤醒处于当前condition阻塞队列头部的线程
void signal();
// 唤醒所有阻塞在当前condition的线程
void signalAll();
......
}
- Condition的使用场景
以ArrayBlockingQueue为例,通过内部维护一个lock锁,两个condition,实现了对队列头线程(读线程)和尾部(写线程)的精准唤醒。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 通过数组实现的阻塞队列,存放数组的元素
final Object[] items;
// 读索引,即队列的头部:队列只能从头部读取数据
int takeIndex;
// 写索引,即队列的尾部:队列只能向尾部插入数据
int putIndex;
// 通过互斥锁实现了线程安全
final ReentrantLock lock;
// 队列为空时,阻塞读线程,队列不为空时,唤醒读线程
private final Condition notEmpty;
// 队列满时,阻塞写线程;队列不满时,唤醒写线程
private final Condition notFull;
// 阻塞队列的插入操作:队列满,则阻塞
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取当前锁:注意:执行condition的方式时,必须获取到锁
lock.lockInterruptibly();
try {
// count == items.length表示队列已满,阻塞写线程,即put操作
while (count == items.length)
notFull.await();
// 队列不满时,执行插入队列的逻辑,代码看下面
enqueue(e);
} finally {
lock.unlock();
}
}
// 向队列中插入元素的方法
private void enqueue(E x) {
// 此时线程持有lock锁,线程安全
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 插入完成后,队列肯定非空,所以需要唤醒处于阻塞状态的读线程
notEmpty.signal();
}
// 读取元素的方法:即从队列头部获取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// count==0表示队列为空,阻塞读线程,即take方法
while (count == 0)
notEmpty.await();
// 执行具体的读取操作,代码如下
return dequeue();
} finally {
lock.unlock();
}
}
// 具体的读取元素的操作
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
// 此时线程持有lock锁,线程安全
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 读取完成,说明队列肯定不满,所以需要唤醒处于阻塞状态的写线程
notFull.signal();
return x;
}
......
}
Condition的实现原理
- 使用Condition可以有效避免wait/notify使用中生产者通知生产者,消费者通知消费者的情况。另外,重要的事情说三遍:Condition必须和Lock结合使用,Condition必须和Lock结合使用,Condition必须和Lock结合使用。
- 读写锁中的读锁(ReadLock)不支持Condition,写锁(WriteLock)和互斥锁(ReentrantLock)都是支持
Condition
的。它们各自调用各自的Sync
内部类的newCondition()
方法,而Sync
类又是调用的AQS中的newCondition()
方法。因此最终Condition
的实现都来自AQS。
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
// ConditionObject是AQS的一个内部类
return new ConditionObject();
}
}
- 由于每个Condition上会阻塞对个线程,很显然,每个Condition内部会维护一个阻塞队列。在ConditionObject内部通过firstWaiter和lastWaiter两个指针形成一个单向链表,实现了阻塞队列。
public class ConditionObject implements Condition, java.io.Serializable {
/** 条件队列的头结点. */
private transient Node firstWaiter;
/** 条件队列的尾节点 */
private transient Node lastWaiter;
}
-
await()
方法源码分析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加到该Condition对象的条件队列中:即firstWaiter和lastWaiter构成的单向链表的尾部
Node node = addConditionWaiter();
// 释放锁:阻塞在Condition之前,必须释放锁,否则会死锁。
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在AQS的阻塞队列。
// 注意: 这里是是否在AQS的阻塞队列,而不是condition的条件队列,若不在,则阻塞自己。
// 初始化时,当前节点是放在条件队列的。调用signal方法时,会将条件队列队头的节点移动到阻塞队列。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 响应中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// node节点被唤醒从,从阻塞队列中获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 当前节点不是条件队列的最后一个节点,所以需要移除已取消的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 将当前线程包装成一个Node后,加入到条件队列尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
isOnSyncQueue(node) 用于判断该Node是否在AQS的同步队列里面。初始的时候,是放在condition的条件队列的,而不在AQS的阻塞队列。但执行signal方法时,会被放进AQS的阻塞队列。
-
signal()
方法剖析
同await操作一样,在调用signal的时候,必须先拿到锁(否则会抛出异常)。
public final void signal() {
// 校验当前线程:只有持有锁的线程,才有资格调用signal方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒该Condition对象的条件队列的头结点线程
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 通过do-while循环,从condition的条件队列中找到第一个需要被移动到阻塞队列的节点,即为被唤醒的节点。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 执行唤醒操作的逻辑方法
final boolean transferForSignal(Node node) {
// 当前节点的waitSattus != Node.CONDITION,则表示该线程已取消。直接返回false。
// 从doSignal方法可知,返回false时,继续下一次循环,寻找下一个节点。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 如果当前节点没有取消,则调用AQS中的方法,将当前节点放入阻塞队列尾部
// 这个地方就和await方法中的isOnSyncQueue(node)方法向对应
// 注意:这里的p是node节点的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
注意:
signal()
方法只是单纯的将Condition条件队列(即firstWaiter
和lastWaiter
构成的单向链表)的第一个可唤醒的节点(不一定是头结点)转移到AQS的阻塞队列,并没有执行unpark()
方法唤醒处于await状态的线程。真正唤醒线程的操作仍然是在lock.unlock()
方法中完成的。
本文地址:https://blog.csdn.net/youngboy_su/article/details/107871242
上一篇: 算法:现在你在玩一款游戏,叫做节奏大师。它有三种判定
下一篇: 某校2019专硕编程题-队列