(二十九) ReentrantLock
前言: 从准备面试的时候就开始时不时接触ReentrantLock,相关博客也看了不少,总是感觉不是很理解,还是自己没有动手主动理解过的原因吧,现在工作也找了,离职也快了,熟悉一下ReentrantLock。
参考博客:
2. 轻松学习java可重入锁(ReentrantLock)的实现原理
4.ReenTrantLock可重入锁(和synchronized的区别)总结
5.java多线程系列(四)---ReentrantLock的使用
demo: jiatai 的 ReentrantLock demo
1. 基本概念
简单来说就是ReentrantLock和Sychronized差不多,但是由于功能有拓展,所以更厉害。主要就是关注下厉害在哪里?
ReenTrantLock独有的能力:
1. ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。
2. ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
3. ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。
1.1 源码注释
源码注释中是这样解释的:
ReentrantLock是一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。
Reentrant的构造器提供了fairness参数用于指定该lock是公平锁还是非公平锁。当设为true时,各线程互相争夺,一般是等待时间最长的线程获取访问锁的权限。 另外,这个锁并不能保证任何特定的访问顺序。比那些使用默认设置(即使用非公平锁的实现)的程序, 使用被许多线程访问的公平锁的程序可能显示较低的整体吞吐量(即较慢,或者慢得多),但在获取锁和缺少饥饿的保证上有更小的差异。 但请注意,锁的公平性并不能保证线程调度的公平性。 因此,使用一个公平锁可能会被许多线程其中之一连续获得多次,而其他活动线程这时就阻塞住,获取不到该锁。
还要注意,不计时的{@link #tryLock()}方法不会尊重公平的环境。如果锁是可用的,即使其他线程正在等待,tryLock()会成功。
/**
* A reentrant mutual exclusion {@link Lock} with the same basic
* behavior and semantics as the implicit monitor lock accessed using
* {@code synchronized} methods and statements, but with extended
* capabilities.
*
* <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
* successfully locking, but not yet unlocking it. A thread invoking
* {@code lock} will return, successfully acquiring the lock, when
* the lock is not owned by another thread. The method will return
* immediately if the current thread already owns the lock. This can
* be checked using methods {@link #isHeldByCurrentThread}, and {@link
* #getHoldCount}.
*
* <p>The constructor for this class accepts an optional
* <em>fairness</em> parameter. When set {@code true}, under
* contention, locks favor granting access to the longest-waiting
* thread. Otherwise this lock does not guarantee any particular
* access order. Programs using fair locks accessed by many threads
* may display lower overall throughput (i.e., are slower; often much
* slower) than those using the default setting, but have smaller
* variances in times to obtain locks and guarantee lack of
* starvation. Note however, that fairness of locks does not guarantee
* fairness of thread scheduling. Thus, one of many threads using a
* fair lock may obtain it multiple times in succession while other
* active threads are not progressing and not currently holding the
* lock.
* Also note that the untimed {@link #tryLock()} method does not
* honor the fairness setting. It will succeed if the lock
* is available even if other threads are waiting.
*
* <p>It is recommended practice to <em>always</em> immediately
* follow a call to {@code lock} with a {@code try} block, most
* typically in a before/after construction such as:
*
* <pre> {@code
* class X {
* private final ReentrantLock lock = new ReentrantLock();
* // ...
*
* public void m() {
* lock.lock(); // block until condition holds
* try {
* // ... method body
* } finally {
* lock.unlock()
* }
* }
* }}</pre>
*
* <p>In addition to implementing the {@link Lock} interface, this
* class defines a number of {@code public} and {@code protected}
* methods for inspecting the state of the lock. Some of these
* methods are only useful for instrumentation and monitoring.
*
* <p>Serialization of this class behaves in the same way as built-in
* locks: a deserialized lock is in the unlocked state, regardless of
* its state when serialized.
*
* <p>This lock supports a maximum of 2147483647 recursive locks by
* the same thread. Attempts to exceed this limit result in
* {@link Error} throws from locking methods.
*
* @since 1.5
* @author Doug Lea
*/
1.2 相关知识介绍
- 可重入锁。可重入锁是指同一个线程可以多次获取同一把锁。ReentrantLock和synchronized都是可重入锁。
- 可中断锁。可中断锁是指线程尝试获取锁的过程中,是否可以响应中断。synchronized是不可中断锁,而ReentrantLock则提供了中断功能。
- 公平锁与非公平锁。公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”。synchronized是非公平锁,而ReentrantLock的默认实现是非公平锁,但是也可以设置为公平锁。
- CAS操作(CompareAndSwap)。CAS操作简单的说就是比较并交换。CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。” Java并发包(java.util.concurrent)中大量使用了CAS操作,涉及到并发的地方都调用了sun.misc.Unsafe类方法进行CAS操作。
2. 使用
2.1 简单使用
类似于sychronized
package com.example.demo_29_reentrantlock;
import java.util.concurrent.locks.ReentrantLock;
public class MyClass {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args){
new ReentrantLockThread("jiatai1").start();
new ReentrantLockThread("jiatai2").start();
new ReentrantLockThread("jiatai3").start();
new ReentrantLockThread("jiatai4").start();
new ReentrantLockThread("jiatai5").start();
}
private static void f1(){
lock.lock();
try {
System.out.println("do something" + Thread.currentThread());
Thread.sleep(2000);
}catch (Exception ex){
ex.printStackTrace();
}finally {
lock.unlock();
}
}
static class ReentrantLockThread extends Thread{
ReentrantLockThread(String s){
super(s);
}
@Override
public void run() {
super.run();
f1();
}
}
}
对应log:
默认实现是非公平锁,但是看起来锁还是依据先后顺序获取的,非公平锁到底在何种情况下体现呢?是否是已经排好队的就没有资格插队呢?
从我看来非公平锁是排好队的就没机会插队了,新来的还没排队的可以插队。
2.2 Condition
condition有如下对应api
每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。
条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,
一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式
的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像
Object.wait
做的那样。
参考博客用Condition写了个生产者-消费者的例子,总感觉没有将Conditon的作用发挥出来,还是用的像是Object的wait/notify一样,Condition的优势应该在于可以唤醒特定的某几个线程。
ProductQueue.java(生产队列)
package com.example.demo_29_reentrantlock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProductQueue<T> {
private final T[] items;
private final Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
//
private int head, tail, count;
public ProductQueue(int maxSize) {
items = (T[]) new Object[maxSize];
}
public ProductQueue() {
this(10);
}
public void put(T t) throws InterruptedException {
lock.lock();
try {
while (count == getCapacity()) {
notFull.await();
}
items[tail] = t;
if (++tail == getCapacity()) {
tail = 0;
}
++count;
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T ret = items[head];
items[head] = null;//GC
//
if (++head == getCapacity()) {
head = 0;
}
--count;
notFull.signalAll();
return ret;
} finally {
lock.unlock();
}
}
public int getCapacity() {
return items.length;
}
public int size() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
创建生产者消费者进行工作:
ProductQueueTest.java
package com.example.demo_29_reentrantlock;
public class ProductQueueTest {
private static ProductQueue productQueue = new ProductQueue<Integer>();
public static void main(String[] args) {
new FactoryThread().start();
new ConsumeThread("jiatai1").start();
new ConsumeThread("jiatai2").start();
new ConsumeThread("jiatai3").start();
new ConsumeThread("jiatai4").start();
}
static class FactoryThread extends Thread{
@Override
public void run() {
super.run();
for(int i = 0; i < 20; i++) {
try {
productQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class ConsumeThread extends Thread{
ConsumeThread(String s){
super(s);
}
@Override
public void run() {
super.run();
for(int i = 0; i < 5; i++) {
try {
System.out.println(Thread.currentThread() + " get " + productQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
对应log(样本量比较小,扩展到100可以看到4个消费者线程交替工作):
Thread[jiatai2,5,main] get 1
Thread[jiatai2,5,main] get 2
Thread[jiatai2,5,main] get 3
Thread[jiatai2,5,main] get 4
Thread[jiatai2,5,main] get 5
Thread[jiatai1,5,main] get 0
Thread[jiatai3,5,main] get 7
Thread[jiatai3,5,main] get 8
Thread[jiatai3,5,main] get 9
Thread[jiatai3,5,main] get 10
Thread[jiatai3,5,main] get 11
Thread[jiatai1,5,main] get 12
Thread[jiatai1,5,main] get 13
Thread[jiatai1,5,main] get 14
Thread[jiatai1,5,main] get 15
Thread[jiatai4,5,main] get 6
Thread[jiatai4,5,main] get 16
Thread[jiatai4,5,main] get 17
Thread[jiatai4,5,main] get 18
Thread[jiatai4,5,main] get 19
2.3 lockInterruptibly
用了codota真是太方便了=-=链接:https://www.zhihu.com/question/36771163/answer/68974735
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
1)如果当前线程未被中断,则获取锁。
2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态: 1)锁由当前线程获得;或者 2)其他某个线程中断当前线程。
5)如果当前线程获得该锁,则将锁保持计数设置为 1。
如果当前线程: 1)在进入此方法时已经设置了该线程的中断状态;或者 2)在等待获取锁的同时被中断。 则抛出 InterruptedException,并且清除当前线程的已中断状态。
6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。 指定者: 接口 Lock 中的 lockInterruptibly抛出: InterruptedException 如果当前线程已中断。
3. 对照使用学习源码
先介绍下ReentrantLock内部锁包含的sync内部类继承关系,如下图所示
3.1 构造函数
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock构造函数分为无参和有参的,无参的默认时候使用的非公平锁,有参并为true的时候才用的公平锁。
3.2 lock()、NonfairSync & FairSync
其实ReentrantLock的lock()方法继而还是会调用到NonefairSync或者FairSync的lock方法里
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
3.2.1 NonfairSync
这时候我们先看下默认实现NonfairSync的lock方法:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
首先我们看到NonefairSync先判断了一下当前state是否是0,0意味着当前锁还未被任何线程获取,这时就将state更新为1,表示锁已经被占据了。setExclusiveOwnerThread表示占据了锁的独占访问权限。
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
否则走到acquire(1)中去
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire做了两件事情:1.获取锁 2.如果获取不成功,则将线程排个队。
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这边是代表的获取锁的操作,state是0表示锁仍可以获取,重复之前分析的获取锁的操作,如果当前线程就是占据锁的线程,则将state+1,返回true。这里就是很多博客中提及的一个线程第一次占据锁后state为1,当第二次获取锁后state变为2的原因。
返回false的时候就走到下面的acquireQueued流程中去:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中的addWaiter就是维护一条双向链表,将新添加的Node放在链表的尾巴上。initializeSyncQueue对应的就是在链表为空的时候初始化个默认头结点。
/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
至于acquireQueued方法:
/*
* Various flavors of acquire, varying in exclusive/shared and
* control modes. Each is mostly the same, but annoyingly
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
* least not without hurting performance too much.
*/
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
可以观察到其中包含了1个无限循环体,循环直至节点的prev节点是head并且可以成功获取到锁后停止,接着将自己作为头结点。
3.2.2 FairSync
这时候我们再看下FairSync,并着重关注一下与NonfairSync的异同。
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
FairSync的lock方法是不像NonFairSync方法一样先很流氓地看下锁能不能获取,能就立刻插个队,而是和NonfairSync第二步一样acquire(1)一下。流程差不多,state为0表示锁可以获取,接着会判断一下该线程是不是排在第二个(等待锁时间最长,第一个head为默认初始化的),是的话就获取锁,毕竟公平第一嘛;走到else里和NonfairSync差不多state+acquires。
主要看下hasQueuedPredecessors方法:
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread()
* && hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
3.3 unlock
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
继而调用到sync的release(int arg)方法
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先会调用tryRelease更新state来体现锁的释放,和锁的获取对应。
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
接着如果释放锁state更新成功,则会继续往下执行,如果还记得的话,我们将第一个等待锁的线程加入链表队列的时候初始化了一个head
/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}
当其在for循环中获取锁的时候会将其作为head
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
继而释放的时候用的是Node的waitStatus是0不会继续往下走(Node.EXCLUSIVE作为默认参数初始化而没有初始化waitStatus)
3.4 newCondition()
/**
* Returns a {@link Condition} instance for use with this
* {@link Lock} instance.
*
* <p>The returned {@link Condition} instance supports the same
* usages as do the {@link Object} monitor methods ({@link
* Object#wait() wait}, {@link Object#notify notify}, and {@link
* Object#notifyAll notifyAll}) when used with the built-in
* monitor lock.
*
* <ul>
*
* <li>If this lock is not held when any of the {@link Condition}
* {@linkplain Condition#await() waiting} or {@linkplain
* Condition#signal signalling} methods are called, then an {@link
* IllegalMonitorStateException} is thrown.
*
* <li>When the condition {@linkplain Condition#await() waiting}
* methods are called the lock is released and, before they
* return, the lock is reacquired and the lock hold count restored
* to what it was when the method was called.
*
* <li>If a thread is {@linkplain Thread#interrupt interrupted}
* while waiting then the wait will terminate, an {@link
* InterruptedException} will be thrown, and the thread's
* interrupted status will be cleared.
*
* <li>Waiting threads are signalled in FIFO order.
*
* <li>The ordering of lock reacquisition for threads returning
* from waiting methods is the same as for threads initially
* acquiring the lock, which is in the default case not specified,
* but for <em>fair</em> locks favors those threads that have been
* waiting the longest.
*
* </ul>
*
* @return the Condition object
*/
public Condition newCondition() {
return sync.newCondition();
}
上面注释提及了:
- condition的用法和object的wait/notify/notifyAll类似
- 等待线程是以FIFO 的顺序被 signalled
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject是AQS(AbstractQueuedSynchronizer)内部一个实现了Condition接口的内部类。
我们之前用的就是其中的await和signal接口。
3.4.1 await
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先判断一下是否中断,是的话抛出中断异常;接着添加waiter节点到等待队列中去。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
然后释放持有的锁,并保存之前的state。
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
release(savedState)之前我们在unlock方法中接触到过,其实就是释放该线程持有的锁。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这次由于Node的waitStatus是Condition即-2,所以会走unparkSuccessor(h)流程。
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这边节点操作不是很懂,这里将node后面的节点,如果是空或者waitStatus是CANCELLED,那就置为空,并且从该节点往前遍历,看waitStatus什么时候小于0,赋值给s。最后唤醒s对应的Thread。LockSupport在demo 9 futureTask里接触过,当时还举了下面的例子,这边unpark是唤醒s对应的Thread。
package com.example.demo_9_locksupport;
import java.util.concurrent.locks.LockSupport;
public class MyClass {
public static void main(String[] args){
Thread boy = new Thread(){
@Override
public void run() {
super.run();
System.out.println("he is playing game");
LockSupport.park();
System.out.println("he must give up playing game and shop with his girl friend");
}
};
boy.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("his girl friend calls him to shop with her");
LockSupport.unpark(boy);
}
}
释放锁之后会调用LockSupport.park(this);将自己阻塞住。这时我们再看下signal方法。
3.4.2 signal
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
接着看下doSignal方法,循环遍历知道成功唤醒一个线程获取到锁。
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
如果node前面一个节点的waitStatus是CANCELLED则唤醒对应Node所对应的Thread。这边感觉不懂=-=
这边unpark对应前面await里的park,是await里的流程可以继续往下走了。简单看来就是唤醒后重新获取锁,这边 获取锁的流程也和lock是一样的。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
感觉好晕,没梳理清楚,应该是没有对照代码把链表图画出来的缘故,待续。。。
3.5 lockInterruptibly
感觉lockInterruptibly这个方法如果不考虑中断的话和lock一样用。如下源码也证实了这一点,在获取锁之前先判断下当前线程是否处于中断状态,如果是抛出中断异常,否则和lock类似地获取锁,只不过每次尝试获取锁的时候都会判断一下是否中断。中断就抛出中断异常。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
上一篇: 二十九,去打酒
下一篇: windows命令行工具cmder安装
推荐阅读
-
AbstractQueuedSynchronizer源码分析(ReentrantLock锁的实现)
-
Java并发——结合CountDownLatch源码、Semaphore源码及ReentrantLock源码来看AQS原理
-
Java并发之ReentrantLock类源码解析
-
ReentrantLock等待通知机制Condition介绍
-
Linux学习之CentOS(二十九)--Linux网卡高级命令、IP别名及多网卡绑定的方法
-
ReentrantLock锁 源码分析
-
教你完全理解ReentrantLock重入锁
-
多线程通信的两种方式? (可重入锁ReentrantLock和Object)
-
联想激光打印机二十九例故障检修实例
-
jQuery 源码解析(二十九) 样式操作模块 尺寸详解