欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java并发编程7–并发工具之Condition

程序员文章站 2022-07-05 10:58:27
文章目录ConditionCondition 的基本使用Condition 源码分析线程这块的一些工具类,基本都会以原理为主,通过分析别人代码的设计和实现,给自己提供积累一些方法和工具。Condition在前面学习 synchronized 的时候,有讲到 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。既然 J.U.C 里面提供了锁的实现机制,那 J.U.C 里面有没有提供类似的线程通信的工具呢?于是发现了一个 Condition 工具类。Conditio...

更多内容请看:Java并发编程学习笔记


线程这块的一些工具类,基本都会以原理为主,通过分析别人代码的设计和实现,给自己提供积累一些方法和工具。

Condition

在前面学习 synchronized 的时候,有讲到 wait/notify 的基本使用,结合 synchronized 可以实现对线程的通信。既然 J.U.C 里面提供了锁的实现机制,那 J.U.C 里面有没有提供类似的线程通信的工具呢?于是发现了一个 Condition 工具类。

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒。

Condition 的基本使用

ConditionWait

public class ConditionDemoWait implements Runnable {
		private Lock lock;
		private Condition condition;

		public ConditionDemoWait(Lock lock, Condition condition) {
			this.lock = lock;
			this.condition = condition;
		}
		@Override
		public void run() {
			System.out.println("begin -ConditionDemoWait");
			try {
				lock.lock();
				condition.await();
				System.out.println("end -ConditionDemoWait");
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
		}
	}

ConditionSignal

public class ConditionDemoSignal implements Runnable {
		private Lock lock;
		private Condition condition;

		public ConditionDemoSignal(Lock lock, Condition condition) {
			this.lock = lock;
			this.condition = condition;
		}
		@Override
		public void run() {
			System.out.println("begin -ConditionDemoSignal");
			try {
				lock.lock();
				condition.signal();
				System.out.println("end -ConditionDemoSignal");
			} finally {
				lock.unlock();
			}
		}
	}

通过这个案例简单实现了 wait 和 notify 的功能,当调用await 方法后,当前线程会释放锁并等待,而其他线程调用condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法
await:把当前线程阻塞挂起
signal:唤醒阻塞的线程

Condition 源码分析

调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,在上面那个案例中,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况:
Java并发编程7–并发工具之Condition
那么这个时候 ThreadA 调用了 condition.await 方法,它做了什么事情呢?

condition.await

调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了Condition 相关联的锁。

public final void await() throws InterruptedException {
      if (Thread.interrupted()) // 表示await允许被中断
          throw new InterruptedException();
      // 创建一个新的节点,节点状态为condition,采用的数据结构依然是链表
      Node node = addConditionWaiter();
	  // 释放当前锁,得到锁的状态并唤醒AQS队列中的一个线程
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      // 如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
      while (!isOnSyncQueue(node)) {// 第一次判断为false,因为线程已经释放了,所以肯定不在同步队了
          LockSupport.park(this);// 通过park操作挂起当前线程
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      // 当线程被唤醒后,会尝试来获取锁,如果acquireQueued返回false就表示拿到锁了。
      // interruptMode !=THROW_IE 表示这个线程没有成功将node入队,但signal执行了enq方法让其入队了
      //  将这个变量设置成 REINTERRUPT
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
          // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点
      if (node.nextWaiter != null) // clean up if cancelled
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }

addConditionWaiter()
这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。 这里的队列不再是双向链表,而是单向链表;

private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	// 如果 lastWaiter不等于空并且waitStatus 不等于CONDITION时,把这个节点从链表中移除
	if (t != null && t.waitStatus != Node.CONDITION) {
	    unlinkCancelledWaiters();
	    t = lastWaiter;
	}
	//构建一个Node,waitStatus=CONDITION这里的链表是一个单向的,所以相比AQS来说会简单很多
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
	    firstWaiter = node;
	else
	    t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队列:
也就是释放锁了ThreadA会加入到这个队列;
Java并发编程7–并发工具之Condition
fullyRelease

fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零;

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();  // 获取到重复次数
        if (release(savedState)) {   // 释放锁并且唤醒下一个同步队列中的线程
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

图解分析

此时,同步队列会触发锁的释放和重新竞争。 ThreadB 获得了锁。
Java并发编程7–并发工具之Condition

condition.signal

await 方法会阻塞 ThreadA,然后 ThreadB 抢占到了锁获得了执行权限,这个时候在 ThreadB 中调用了 Condition的 signal()方法,将会唤醒在等待队列中节点。

Condition.signal

public final void signal() {
		// 先判断当前线程是否获得了锁,如果没有获得是没有释放这一说的,直接抛异常
		if (!isHeldExclusively())
			throw new IllegalMonitorStateException();
		// 获取到condition队列上的第一个节点
		Node first = firstWaiter;
		if (first != null)
			doSignal(first);
	}
	/**
	 * 对 condition 队列中从首部开始的第一个 condition 状态的节点, 执行 transferForSignal 操作,将 node
	 * 从 condition队列中转换到 AQS 队列中, 同时修改 AQS 队列中原先尾节点的状态
	 */
	private void doSignal(Node first) {
		do {
			//从 Condition 队列中删除 first 节点
			if ((firstWaiter = first.nextWaiter) == null)
				lastWaiter = null; // 将next节点设置为null
			first.nextWaiter = null;
		} while (!transferForSignal(first) && (first = firstWaiter) != null);
	}

AQS.transferForSignal
该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,
那个节点就会在 await 方法中苏醒

图解分析
执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上, 逻辑结构图如下这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus, 如果大于 0 或者设置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。 这个时候会唤醒ThreadA 这个线程。 否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA。

Java并发编程7–并发工具之Condition

Condition 总结

await 和 signal 的总结
前面的整个分解的图再通过一张整体的结构图来表述,线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了condition.signal 或者 signalAll 方法,使得线程awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

Java并发编程7–并发工具之Condition

  • 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队
    列,则自旋等待尝试获取锁
  • 释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁的获取流程

本文地址:https://blog.csdn.net/qq_19642249/article/details/107316456

相关标签: java并发编程