欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(二十九) ReentrantLock

程序员文章站 2022-04-13 22:44:22
...

前言: 从准备面试的时候就开始时不时接触ReentrantLock,相关博客也看了不少,总是感觉不是很理解,还是自己没有动手主动理解过的原因吧,现在工作也找了,离职也快了,熟悉一下ReentrantLock。


参考博客:

1. ReentrantLock实现原理

2. 轻松学习java可重入锁(ReentrantLock)的实现原理

3.再谈重入锁--ReentrantLock

4.ReenTrantLock可重入锁(和synchronized的区别)总结

5.java多线程系列(四)---ReentrantLock的使用


demo: jiatai 的 ReentrantLock demo


1. 基本概念

简单来说就是ReentrantLock和Sychronized差不多,但是由于功能有拓展,所以更厉害。主要就是关注下厉害在哪里?

ReenTrantLock独有的能力:

1.      ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。

2.      ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

3.      ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。



1.1 源码注释

源码注释中是这样解释的:

ReentrantLock是一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。

Reentrant的构造器提供了fairness参数用于指定该lock是公平锁还是非公平锁。当设为true时,各线程互相争夺,一般是等待时间最长的线程获取访问锁的权限。 另外,这个锁并不能保证任何特定的访问顺序。比那些使用默认设置(即使用非公平锁的实现)的程序, 使用被许多线程访问的公平锁的程序可能显示较低的整体吞吐量(即较慢,或者慢得多),但在获取锁和缺少饥饿的保证上有更小的差异。 但请注意,锁的公平性并不能保证线程调度的公平性。 因此,使用一个公平锁可能会被许多线程其中之一连续获得多次,而其他活动线程这时就阻塞住,获取不到该锁
  还要注意,不计时的{@link #tryLock()}方法不会尊重公平的环境。如果锁是可用的,即使其他线程正在等待,tryLock()会成功

/**
 * A reentrant mutual exclusion {@link Lock} with the same basic
 * behavior and semantics as the implicit monitor lock accessed using
 * {@code synchronized} methods and statements, but with extended
 * capabilities.
 *
 * <p>A {@code ReentrantLock} is <em>owned</em> by the thread last
 * successfully locking, but not yet unlocking it. A thread invoking
 * {@code lock} will return, successfully acquiring the lock, when
 * the lock is not owned by another thread. The method will return
 * immediately if the current thread already owns the lock. This can
 * be checked using methods {@link #isHeldByCurrentThread}, and {@link
 * #getHoldCount}.
 *
 * <p>The constructor for this class accepts an optional
 * <em>fairness</em> parameter.  When set {@code true}, under
 * contention, locks favor granting access to the longest-waiting
 * thread.  Otherwise this lock does not guarantee any particular
 * access order.  Programs using fair locks accessed by many threads
 * may display lower overall throughput (i.e., are slower; often much
 * slower) than those using the default setting, but have smaller
 * variances in times to obtain locks and guarantee lack of
 * starvation. Note however, that fairness of locks does not guarantee
 * fairness of thread scheduling. Thus, one of many threads using a
 * fair lock may obtain it multiple times in succession while other
 * active threads are not progressing and not currently holding the
 * lock.
 * Also note that the untimed {@link #tryLock()} method does not
 * honor the fairness setting. It will succeed if the lock
 * is available even if other threads are waiting.
 *
 * <p>It is recommended practice to <em>always</em> immediately
 * follow a call to {@code lock} with a {@code try} block, most
 * typically in a before/after construction such as:
 *
 * <pre> {@code
 * class X {
 *   private final ReentrantLock lock = new ReentrantLock();
 *   // ...
 *
 *   public void m() {
 *     lock.lock();  // block until condition holds
 *     try {
 *       // ... method body
 *     } finally {
 *       lock.unlock()
 *     }
 *   }
 * }}</pre>
 *
 * <p>In addition to implementing the {@link Lock} interface, this
 * class defines a number of {@code public} and {@code protected}
 * methods for inspecting the state of the lock.  Some of these
 * methods are only useful for instrumentation and monitoring.
 *
 * <p>Serialization of this class behaves in the same way as built-in
 * locks: a deserialized lock is in the unlocked state, regardless of
 * its state when serialized.
 *
 * <p>This lock supports a maximum of 2147483647 recursive locks by
 * the same thread. Attempts to exceed this limit result in
 * {@link Error} throws from locking methods.
 *
 * @since 1.5
 * @author Doug Lea
 */


1.2 相关知识介绍

  • 可重入锁。可重入锁是指同一个线程可以多次获取同一把锁。ReentrantLock和synchronized都是可重入锁。
  • 可中断锁。可中断锁是指线程尝试获取锁的过程中,是否可以响应中断。synchronized是不可中断锁,而ReentrantLock则提供了中断功能。
  • 公平锁与非公平锁。公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”。synchronized是非公平锁,而ReentrantLock的默认实现是非公平锁,但是也可以设置为公平锁。
  • CAS操作(CompareAndSwap)。CAS操作简单的说就是比较并交换。CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。” Java并发包(java.util.concurrent)中大量使用了CAS操作,涉及到并发的地方都调用了sun.misc.Unsafe类方法进行CAS操作。


2. 使用

2.1 简单使用

类似于sychronized

package com.example.demo_29_reentrantlock;

import java.util.concurrent.locks.ReentrantLock;

public class MyClass {
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args){
        new ReentrantLockThread("jiatai1").start();
        new ReentrantLockThread("jiatai2").start();
        new ReentrantLockThread("jiatai3").start();
        new ReentrantLockThread("jiatai4").start();
        new ReentrantLockThread("jiatai5").start();
    }

    private static void f1(){
        lock.lock();
        try {
            System.out.println("do something" + Thread.currentThread());
            Thread.sleep(2000);
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    static class ReentrantLockThread extends Thread{
        ReentrantLockThread(String s){
            super(s);
        }
        @Override
        public void run() {
            super.run();
            f1();
        }
    }
}

对应log:

(二十九) ReentrantLock

默认实现是非公平锁,但是看起来锁还是依据先后顺序获取的,非公平锁到底在何种情况下体现呢?是否是已经排好队的就没有资格插队呢?

从我看来非公平锁是排好队的就没机会插队了,新来的还没排队的可以插队。


2.2 Condition

condition有如下对应api

(二十九) ReentrantLock

await*对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆。

每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。

条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,

一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式

的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像 Object.wait 做的那样。

参考博客用Condition写了个生产者-消费者的例子,总感觉没有将Conditon的作用发挥出来,还是用的像是Object的wait/notify一样,Condition的优势应该在于可以唤醒特定的某几个线程。

ProductQueue.java(生产队列)

package com.example.demo_29_reentrantlock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProductQueue<T> {

    private final T[] items;

    private final Lock lock = new ReentrantLock();

    private Condition notFull = lock.newCondition();

    private Condition notEmpty = lock.newCondition();

    //
    private int head, tail, count;

    public ProductQueue(int maxSize) {
        items = (T[]) new Object[maxSize];
    }

    public ProductQueue() {
        this(10);
    }

    public void put(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == getCapacity()) {
                notFull.await();
            }
            items[tail] = t;
            if (++tail == getCapacity()) {
                tail = 0;
            }
            ++count;
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            T ret = items[head];
            items[head] = null;//GC
            //
            if (++head == getCapacity()) {
                head = 0;
            }
            --count;
            notFull.signalAll();
            return ret;
        } finally {
            lock.unlock();
        }
    }

    public int getCapacity() {
        return items.length;
    }

    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

}

创建生产者消费者进行工作:

ProductQueueTest.java

package com.example.demo_29_reentrantlock;

public class ProductQueueTest {
    private static ProductQueue productQueue = new ProductQueue<Integer>();
    public static void main(String[] args) {
        new FactoryThread().start();
        new ConsumeThread("jiatai1").start();
        new ConsumeThread("jiatai2").start();
        new ConsumeThread("jiatai3").start();
        new ConsumeThread("jiatai4").start();
    }

    static class FactoryThread extends Thread{
        @Override
        public void run() {
            super.run();
            for(int i = 0; i < 20; i++) {
                try {
                    productQueue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class ConsumeThread extends Thread{
        ConsumeThread(String s){
            super(s);
        }
        @Override
        public void run() {
            super.run();
            for(int i = 0; i < 5; i++) {
                try {
                    System.out.println(Thread.currentThread() + " get " + productQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

对应log(样本量比较小,扩展到100可以看到4个消费者线程交替工作):

Thread[jiatai2,5,main] get 1
Thread[jiatai2,5,main] get 2
Thread[jiatai2,5,main] get 3
Thread[jiatai2,5,main] get 4
Thread[jiatai2,5,main] get 5
Thread[jiatai1,5,main] get 0
Thread[jiatai3,5,main] get 7
Thread[jiatai3,5,main] get 8
Thread[jiatai3,5,main] get 9
Thread[jiatai3,5,main] get 10
Thread[jiatai3,5,main] get 11
Thread[jiatai1,5,main] get 12
Thread[jiatai1,5,main] get 13
Thread[jiatai1,5,main] get 14
Thread[jiatai1,5,main] get 15
Thread[jiatai4,5,main] get 6
Thread[jiatai4,5,main] get 16
Thread[jiatai4,5,main] get 17
Thread[jiatai4,5,main] get 18
Thread[jiatai4,5,main] get 19


2.3  lockInterruptibly

用了codota真是太方便了=-=

(二十九) ReentrantLock

作者:郭无心
链接:https://www.zhihu.com/question/36771163/answer/68974735
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

lockInterruptiblypublic void lockInterruptibly() throws InterruptedException
1)如果当前线程未被中断,则获取锁。
2)如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。
3)如果当前线程已经保持此锁,则将保持计数加 1,并且该方法立即返回。
4)如果锁被另一个线程保持,则出于线程调度目的,禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态: 1)锁由当前线程获得;或者 2)其他某个线程中断当前线程。
5)如果当前线程获得该锁,则将锁保持计数设置为 1。
如果当前线程: 1)在进入此方法时已经设置了该线程的中断状态;或者 2)在等待获取锁的同时被中断。 则抛出 InterruptedException,并且清除当前线程的已中断状态。
6)在此实现中,因为此方法是一个显式中断点,所以要优先考虑响应中断,而不是响应锁的普通获取或重入获取。 指定者: 接口 Lock 中的 lockInterruptibly抛出: InterruptedException 如果当前线程已中断。


3. 对照使用学习源码

先介绍下ReentrantLock内部锁包含的sync内部类继承关系,如下图所示

(二十九) ReentrantLock          (二十九) ReentrantLock

3.1 构造函数

 /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

ReentrantLock构造函数分为无参和有参的,无参的默认时候使用的非公平锁,有参并为true的时候才用的公平锁。


3.2 lock()、NonfairSync & FairSync

其实ReentrantLock的lock()方法继而还是会调用到NonefairSync或者FairSync的lock方法里

/**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }

3.2.1 NonfairSync

这时候我们先看下默认实现NonfairSync的lock方法:

     /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

首先我们看到NonefairSync先判断了一下当前state是否是0,0意味着当前锁还未被任何线程获取,这时就将state更新为1,表示锁已经被占据了。setExclusiveOwnerThread表示占据了锁的独占访问权限。

 /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

否则走到acquire(1)中去

/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire做了两件事情:1.获取锁 2.如果获取不成功,则将线程排个队。

/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

这边是代表的获取锁的操作,state是0表示锁仍可以获取,重复之前分析的获取锁的操作,如果当前线程就是占据锁的线程,则将state+1,返回true。这里就是很多博客中提及的一个线程第一次占据锁后state为1,当第二次获取锁后state变为2的原因。

返回false的时候就走到下面的acquireQueued流程中去:

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                U.putObject(node, Node.PREV, oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中的addWaiter就是维护一条双向链表,将新添加的Node放在链表的尾巴上。initializeSyncQueue对应的就是在链表为空的时候初始化个默认头结点。

/**
     * Initializes head and tail fields on first contention.
     */
    private final void initializeSyncQueue() {
        Node h;
        if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
            tail = h;
    }

至于acquireQueued方法:

 /*
     * Various flavors of acquire, varying in exclusive/shared and
     * control modes.  Each is mostly the same, but annoyingly
     * different.  Only a little bit of factoring is possible due to
     * interactions of exception mechanics (including ensuring that we
     * cancel if tryAcquire throws exception) and other control, at
     * least not without hurting performance too much.
     */

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

可以观察到其中包含了1个无限循环体,循环直至节点的prev节点是head并且可以成功获取到锁后停止,接着将自己作为头结点。

3.2.2 FairSync

这时候我们再看下FairSync,并着重关注一下与NonfairSync的异同。

/**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

FairSync的lock方法是不像NonFairSync方法一样先很流氓地看下锁能不能获取,能就立刻插个队,而是和NonfairSync第二步一样acquire(1)一下。流程差不多,state为0表示锁可以获取,接着会判断一下该线程是不是排在第二个(等待锁时间最长,第一个head为默认初始化的),是的话就获取锁,毕竟公平第一嘛;走到else里和NonfairSync差不多state+acquires。

主要看下hasQueuedPredecessors方法:

/**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * <p>An invocation of this method is equivalent to (but may be
     * more efficient than):
     * <pre> {@code
     * getFirstQueuedThread() != Thread.currentThread()
     *   && hasQueuedThreads()}</pre>
     *
     * <p>Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that some other thread will acquire before the current
     * thread.  Likewise, it is possible for another thread to win a
     * race to enqueue after this method has returned {@code false},
     * due to the queue being empty.
     *
     * <p>This method is designed to be used by a fair synchronizer to
     * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
     * Such a synchronizer's {@link #tryAcquire} method should return
     * {@code false}, and its {@link #tryAcquireShared} method should
     * return a negative value, if this method returns {@code true}
     * (unless this is a reentrant acquire).  For example, the {@code
     * tryAcquire} method for a fair, reentrant, exclusive mode
     * synchronizer might look like this:
     *
     * <pre> {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}</pre>
     *
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }


3.3 unlock

/**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

继而调用到sync的release(int arg)方法

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先会调用tryRelease更新state来体现锁的释放,和锁的获取对应。

/**
     * Attempts to set the state to reflect a release in exclusive
     * mode.
     *
     * <p>This method is always invoked by the thread performing release.
     *
     * <p>The default implementation throws
     * {@link UnsupportedOperationException}.
     *
     * @param arg the release argument. This value is always the one
     *        passed to a release method, or the current state value upon
     *        entry to a condition wait.  The value is otherwise
     *        uninterpreted and can represent anything you like.
     * @return {@code true} if this object is now in a fully released
     *         state, so that any waiting threads may attempt to acquire;
     *         and {@code false} otherwise.
     * @throws IllegalMonitorStateException if releasing would place this
     *         synchronizer in an illegal state. This exception must be
     *         thrown in a consistent fashion for synchronization to work
     *         correctly.
     * @throws UnsupportedOperationException if exclusive mode is not supported
     */ 
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

接着如果释放锁state更新成功,则会继续往下执行,如果还记得的话,我们将第一个等待锁的线程加入链表队列的时候初始化了一个head

 /**
     * Initializes head and tail fields on first contention.
     */
    private final void initializeSyncQueue() {
        Node h;
        if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
            tail = h;
    }

当其在for循环中获取锁的时候会将其作为head

 final boolean acquireQueued(final Node node, int arg) {  
       try {  
           boolean interrupted = false;  
           for (;;) {  
               final Node p = node.predecessor();  
               if (p == head && tryAcquire(arg)) {  
                   setHead(node);  
                   p.next = null; // help GC  
                   return interrupted;  
               }  
               if (shouldParkAfterFailedAcquire(p, node) &&  
                   parkAndCheckInterrupt())  
                   interrupted = true;  
           }  
       } catch (Throwable t) {  
           cancelAcquire(node);  
           throw t;  
       }  
   }  

继而释放的时候用的是Node的waitStatus是0不会继续往下走(Node.EXCLUSIVE作为默认参数初始化而没有初始化waitStatus)


3.4 newCondition()

/**
     * Returns a {@link Condition} instance for use with this
     * {@link Lock} instance.
     *
     * <p>The returned {@link Condition} instance supports the same
     * usages as do the {@link Object} monitor methods ({@link
     * Object#wait() wait}, {@link Object#notify notify}, and {@link
     * Object#notifyAll notifyAll}) when used with the built-in
     * monitor lock.
     *
     * <ul>
     *
     * <li>If this lock is not held when any of the {@link Condition}
     * {@linkplain Condition#await() waiting} or {@linkplain
     * Condition#signal signalling} methods are called, then an {@link
     * IllegalMonitorStateException} is thrown.
     *
     * <li>When the condition {@linkplain Condition#await() waiting}
     * methods are called the lock is released and, before they
     * return, the lock is reacquired and the lock hold count restored
     * to what it was when the method was called.
     *
     * <li>If a thread is {@linkplain Thread#interrupt interrupted}
     * while waiting then the wait will terminate, an {@link
     * InterruptedException} will be thrown, and the thread's
     * interrupted status will be cleared.
     *
     * <li>Waiting threads are signalled in FIFO order.
     *
     * <li>The ordering of lock reacquisition for threads returning
     * from waiting methods is the same as for threads initially
     * acquiring the lock, which is in the default case not specified,
     * but for <em>fair</em> locks favors those threads that have been
     * waiting the longest.
     *
     * </ul>
     *
     * @return the Condition object
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

上面注释提及了:

  • condition的用法和object的wait/notify/notifyAll类似
  • 等待线程是以FIFO 的顺序被 signalled
 final ConditionObject newCondition() {
            return new ConditionObject();
        }

ConditionObject是AQS(AbstractQueuedSynchronizer)内部一个实现了Condition接口的内部类。

(二十九) ReentrantLock

我们之前用的就是其中的await和signal接口。

3.4.1 await

         /**
         * Implements interruptible condition wait.
         * <ol>
         * <li>If current thread is interrupted, throw InterruptedException.
         * <li>Save lock state returned by {@link #getState}.
         * <li>Invoke {@link #release} with saved state as argument,
         *     throwing IllegalMonitorStateException if it fails.
         * <li>Block until signalled or interrupted.
         * <li>Reacquire by invoking specialized version of
         *     {@link #acquire} with saved state as argument.
         * <li>If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

首先判断一下是否中断,是的话抛出中断异常;接着添加waiter节点到等待队列中去。

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

            Node node = new Node(Node.CONDITION);

            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

然后释放持有的锁,并保存之前的state。

    /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }

release(savedState)之前我们在unlock方法中接触到过,其实就是释放该线程持有的锁。

 /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这次由于Node的waitStatus是Condition即-2,所以会走unparkSuccessor(h)流程。

 /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
这边节点操作不是很懂,这里将node后面的节点,如果是空或者waitStatus是CANCELLED,那就置为空,并且从该节点往前遍历,看waitStatus什么时候小于0,赋值给s。最后唤醒s对应的Thread。

LockSupport在demo 9 futureTask里接触过,当时还举了下面的例子,这边unpark是唤醒s对应的Thread。

package com.example.demo_9_locksupport;

import java.util.concurrent.locks.LockSupport;

public class MyClass {
    public static void main(String[] args){
        Thread boy = new Thread(){
            @Override
            public void run() {
                super.run();
                System.out.println("he is playing game");
                LockSupport.park();
                System.out.println("he must give up playing game and shop with his girl friend");
            }
        };
        boy.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("his girl friend calls him to shop with her");
        LockSupport.unpark(boy);
    }
}

释放锁之后会调用LockSupport.park(this);将自己阻塞住。这时我们再看下signal方法。


3.4.2 signal

// public methods

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

接着看下doSignal方法,循环遍历知道成功唤醒一个线程获取到锁。

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

如果node前面一个节点的waitStatus是CANCELLED则唤醒对应Node所对应的Thread。这边感觉不懂=-=

这边unpark对应前面await里的park,是await里的流程可以继续往下走了。简单看来就是唤醒后重新获取锁,这边 获取锁的流程也和lock是一样的。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

感觉好晕,没梳理清楚,应该是没有对照代码把链表图画出来的缘故,待续。。。


3.5 lockInterruptibly

感觉lockInterruptibly这个方法如果不考虑中断的话和lock一样用。如下源码也证实了这一点,在获取锁之前先判断下当前线程是否处于中断状态,如果是抛出中断异常,否则和lock类似地获取锁,只不过每次尝试获取锁的时候都会判断一下是否中断。中断就抛出中断异常。

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
 /**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

/**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }


相关标签: ReentrantLock