欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

玩转高并发系列----锁(三)

程序员文章站 2022-07-10 18:29:46
这一章我们将着重讲解Condition的基本用法以及其实现原理,让读者不光知其然,更知其所以然。并且将带领大家一行一行通过源码的方式剖析其实现细节。基本用法Condition本身是一个接口,其功能和用法类似wait()和notify()。并且在使用Condition时,必须和Lock一起使用,即必须获取到Condition对应的lock锁之后,才能调用Condition的阻塞(await)和唤醒(signal)方法。先看一下Condition的接口声明。public interface Condi...

这一章我们将着重讲解Condition的基本用法以及其实现原理,让读者不光知其然,更知其所以然。并且将带领大家一行一行通过源码的方式剖析其实现细节。

基本用法

  1. Condition本身是一个接口,其功能和用法类似wait()和notify()。并且在使用Condition时,必须和Lock一起使用,即必须获取到Condition对应的lock锁之后,才能调用Condition的阻塞(await)和唤醒(signal)方法。先看一下Condition的接口声明。
public interface Condition {
	// 阻塞当前线程,同时释放锁,等待被signal方法唤醒
	void await() throws InterruptedException;
	// 阻塞当前线程,并等待指定的ns,超时返回
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 唤醒处于当前condition阻塞队列头部的线程
    void signal();
    // 唤醒所有阻塞在当前condition的线程
    void signalAll();
	......
}
  1. Condition的使用场景

以ArrayBlockingQueue为例,通过内部维护一个lock锁,两个condition,实现了对队列头线程(读线程)和尾部(写线程)的精准唤醒。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        // 通过数组实现的阻塞队列,存放数组的元素
        final Object[] items;
        // 读索引,即队列的头部:队列只能从头部读取数据
        int takeIndex;
        // 写索引,即队列的尾部:队列只能向尾部插入数据
        int putIndex;
        // 通过互斥锁实现了线程安全
        final ReentrantLock lock;
        // 队列为空时,阻塞读线程,队列不为空时,唤醒读线程
        private final Condition notEmpty;
        // 队列满时,阻塞写线程;队列不满时,唤醒写线程
        private final Condition notFull;
        
        // 阻塞队列的插入操作:队列满,则阻塞
		public void put(E e) throws InterruptedException {
        	checkNotNull(e);
        	final ReentrantLock lock = this.lock;
        	// 获取当前锁:注意:执行condition的方式时,必须获取到锁
        	lock.lockInterruptibly();
        	try {
        	// count == items.length表示队列已满,阻塞写线程,即put操作
            	while (count == items.length)
                	notFull.await();
               	// 队列不满时,执行插入队列的逻辑,代码看下面
            	enqueue(e);
        	} finally {
            	lock.unlock();
        	}
    	}
    	
    	// 向队列中插入元素的方法
    	private void enqueue(E x) {
    	// 此时线程持有lock锁,线程安全
        	final Object[] items = this.items;
        	items[putIndex] = x;
        	if (++putIndex == items.length)
            	putIndex = 0;
        	count++;
        	// 插入完成后,队列肯定非空,所以需要唤醒处于阻塞状态的读线程
        	notEmpty.signal();
    	}
    	
    	// 读取元素的方法:即从队列头部获取元素
    	public E take() throws InterruptedException {
        	final ReentrantLock lock = this.lock;
        	lock.lockInterruptibly();
        	try {
        	// count==0表示队列为空,阻塞读线程,即take方法
            	while (count == 0)
                	notEmpty.await();
               // 执行具体的读取操作,代码如下
            	return dequeue();
        	} finally {
            	lock.unlock();
        	}
    	}
    	// 具体的读取元素的操作
		private E dequeue() {
        	// assert lock.getHoldCount() == 1;
        	// assert items[takeIndex] != null;
        	// 此时线程持有lock锁,线程安全
        	final Object[] items = this.items;
        	@SuppressWarnings("unchecked")
        	E x = (E) items[takeIndex];
        	items[takeIndex] = null;
        	if (++takeIndex == items.length)
            	takeIndex = 0;
        	count--;
        	if (itrs != null)
            	itrs.elementDequeued();
            // 读取完成,说明队列肯定不满,所以需要唤醒处于阻塞状态的写线程
        	notFull.signal();
        	return x;
    	}
    	......
}

Condition的实现原理

  1. 使用Condition可以有效避免wait/notify使用中生产者通知生产者,消费者通知消费者的情况。另外,重要的事情说三遍:Condition必须和Lock结合使用,Condition必须和Lock结合使用,Condition必须和Lock结合使用。
  2. 读写锁中的读锁(ReadLock)不支持Condition,写锁(WriteLock)和互斥锁(ReentrantLock)都是支持Condition的。它们各自调用各自的Sync内部类的newCondition()方法,而Sync类又是调用的AQS中的newCondition()方法。因此最终Condition的实现都来自AQS。
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final ConditionObject newCondition() {
        // ConditionObject是AQS的一个内部类
            return new ConditionObject();
        }
	}
  1. 由于每个Condition上会阻塞对个线程,很显然,每个Condition内部会维护一个阻塞队列。在ConditionObject内部通过firstWaiter和lastWaiter两个指针形成一个单向链表,实现了阻塞队列。
    public class ConditionObject implements Condition, java.io.Serializable {
        /** 条件队列的头结点. */
        private transient Node firstWaiter;
        /** 条件队列的尾节点 */
        private transient Node lastWaiter;
    }
  1. await()方法源码分析
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加到该Condition对象的条件队列中:即firstWaiter和lastWaiter构成的单向链表的尾部
            Node node = addConditionWaiter();
            // 释放锁:阻塞在Condition之前,必须释放锁,否则会死锁。
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断当前节点是否在AQS的阻塞队列。
            // 注意: 这里是是否在AQS的阻塞队列,而不是condition的条件队列,若不在,则阻塞自己。
            // 初始化时,当前节点是放在条件队列的。调用signal方法时,会将条件队列队头的节点移动到阻塞队列。
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 响应中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // node节点被唤醒从,从阻塞队列中获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
            // 当前节点不是条件队列的最后一个节点,所以需要移除已取消的节点
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

		// 将当前线程包装成一个Node后,加入到条件队列尾部
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

isOnSyncQueue(node) 用于判断该Node是否在AQS的同步队列里面。初始的时候,是放在condition的条件队列的,而不在AQS的阻塞队列。但执行signal方法时,会被放进AQS的阻塞队列。

  1. signal()方法剖析

同await操作一样,在调用signal的时候,必须先拿到锁(否则会抛出异常)。

        public final void signal() {
        // 校验当前线程:只有持有锁的线程,才有资格调用signal方法
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
                
            // 唤醒该Condition对象的条件队列的头结点线程
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        
        // 通过do-while循环,从condition的条件队列中找到第一个需要被移动到阻塞队列的节点,即为被唤醒的节点。
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
        
        // 执行唤醒操作的逻辑方法
        final boolean transferForSignal(Node node) {
        // 当前节点的waitSattus != Node.CONDITION,则表示该线程已取消。直接返回false。
        // 从doSignal方法可知,返回false时,继续下一次循环,寻找下一个节点。
        	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            	return false;
            	
           // 如果当前节点没有取消,则调用AQS中的方法,将当前节点放入阻塞队列尾部
           // 这个地方就和await方法中的isOnSyncQueue(node)方法向对应
           // 注意:这里的p是node节点的前驱节点
 			Node p = enq(node);
        	int ws = p.waitStatus;
        	
        	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            	LockSupport.unpark(node.thread);
        	return true;
    	}

注意:signal()方法只是单纯的将Condition条件队列(即firstWaiterlastWaiter构成的单向链表)的第一个可唤醒的节点(不一定是头结点)转移到AQS的阻塞队列,并没有执行unpark()方法唤醒处于await状态的线程。真正唤醒线程的操作仍然是在lock.unlock()方法中完成的。

本文地址:https://blog.csdn.net/youngboy_su/article/details/107871242