欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java高并发编程---Condition

程序员文章站 2022-05-04 12:50:49
...

Condition接口

 任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比
Java高并发编程---Condition

Condition接口与示例

 Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

ConditionUseCase.java

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
	lock.lock();
	try {
			condition.await();
	} finally {
			lock.unlock();
	}
}
public void conditionSignal() throws InterruptedException {
	lock.lock();
	try {
			condition.signal();
	} finally {
			lock.unlock();
	}
}

 一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

Condition定义的(部分)方法以及描述

Java高并发编程---Condition

BoundedQueue.java

public class BoundedQueue<T> {
	private Object[] items;
	// 添加的下标,删除的下标和数组当前数量
	private int addIndex, removeIndex, count;
	private Lock lock = new ReentrantLock();
	private Condition notEmpty = lock.newCondition();
	private Condition notFull = lock.newCondition();
	public BoundedQueue(int size) {
			items = new Object[size];
	}
	// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
	public void add(T t) throws InterruptedException {
		lock.lock();
		try {
				while (count == items.length)
				notFull.await();
				items[addIndex] = t;
				if (++addIndex == items.length)
				addIndex = 0;
				++count;
				notEmpty.signal();
		} finally {
				lock.unlock();
		}
	}
	// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
	@SuppressWarnings("unchecked")
	public T remove() throws InterruptedException {
		lock.lock();
		try {
				while (count == 0)
				notEmpty.await();
				Object x = items[removeIndex];
				if (++removeIndex == items.length)
						removeIndex = 0;
				--count;
				notFull.signal();
				return (T) x;
		} finally {
				lock.unlock();
		}
	}
}

上述示例中,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个元素。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”。
 首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取。

Condition的实现分析

 ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

等待队列
 等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
 一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如图所示。
Java高并发编程---Condition
 这个模型是不是和前面同步器的模型十分相像?千万不要混淆了,下面把两个模型结合起来:
Java高并发编程---Condition
 如图,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

等待
 调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

ConditionObject的await方法

public final void await() throws InterruptedException {
	if (Thread.interrupted())
			throw new InterruptedException();
	// 当前线程加入等待队列
	Node node = addConditionWaiter();
	// 释放同步状态,也就是释放锁
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {
			LockSupport.park(this);
			if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
					break;
	}
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
			interruptMode = REINTERRUPT;
	if (node.nextWaiter != null)
			unlinkCancelledWaiters();
	if (interruptMode != 0)
			reportInterruptAfterWait(interruptMode);
}

 调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
 当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
 同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。

通知
 调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
 线程加入等待队列示意图
Java高并发编程---Condition

ConditionObject的signal方法

public final void signal() {
	if (!isHeldExclusively())
			throw new IllegalMonitorStateException();
	Node first = firstWaiter;
	if (first != null)
			doSignal(first);
}

 调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。