欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程--ReentrantLock

程序员文章站 2022-05-04 20:53:24
...

并发编程–ReentrantLock

前言

Java常规的多线程间协作的控制为通过synchronized关键字来实现。但是,synchronized并非使用于所有的并发场景,并且执行效率很低(只要存在于线程间切换时上下文切换)。所以在JDK-1.5版本之后引入了继承自Lock接口的ReentrantLock为首的很多对象锁。相比于synchronized来说,使用更加灵活。所以下面,主要以ReentrantLock为突破口来描述下ReentrantLock的具体实现流程以及其中的包含的数据结构和设计思想的个人理解。我会以图+文字的形式来尽可能的描述清楚整套运行原理。

类图

并发编程--ReentrantLock

代码感知

下面先举例下ReentrantLock的使用和不使用的效果,以达到初步的感知的目的。

不使用ReentrantLock加锁的代码:

package com.example.demo.reentrantLock;

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock Demo 代码
 *
 * @author fanyuanhang
 * @date 2020年06月22日 13:43:27
 */
public class LockDemo {
    /**
     * thread pool
     */
    private static ExecutorService threadPool = new ThreadPoolExecutor(10, 15
            , 0L, TimeUnit.MILLISECONDS
            , new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 1000; i++) {
            threadPool.submit(() -> add());
        }
        threadPool.shutdown();
        Thread.sleep(3000);
        System.out.println(count);
    }

    /**
     * add
     */
    public static void add() {
        try {
            count++;
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

执行效果:

输出:995

分析:由于count变量为共享资源,每一个线程池中的线程获取这个变量时可能存在内存不可见的问题。所以最后的结果并非我们预期的1000。了解synchronized和voliate关键字的同学应该知道,这两种方法都可以解决内存不可见的问题。那么,如何使用ReentrantLock达到同样的效果呢?

使用ReentrantLock加锁的代码:

package com.example.demo.reentrantLock;

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock Demo 代码
 *
 * @author fanyuanhang
 * @date 2020年06月22日 13:43:27
 */
public class LockDemo {
    /**
     * lock object
     */
    private static Lock lock = new ReentrantLock();

    /**
     * thread pool
     */
    private static ExecutorService threadPool = new ThreadPoolExecutor(10, 15
            , 0L, TimeUnit.MILLISECONDS
            , new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());

    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {

        for (int i = 0; i < 1000; i++) {
            threadPool.submit(() -> add());
        }
        threadPool.shutdown();
        Thread.sleep(3000);
        System.out.println(count);
    }

    /**
     * add
     */
    public static void add() {
        lock.lock();
        try {
            count++;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

执行效果:

输出:1000

分析:可见,加了ReentrantLock锁后也可以达到同样线程间协作控制的作用。

原理剖析

下面,我们主要看一下ReentrantLock的实现原理,以及内部的运行流程。这里,我想了想还是以一些自己画的流程图来进行图文结合的说明。但是,实操才是掌握的关键!!!

分析入口

对于源码的分析,一般我们需要找到入口才能事半功倍。那么,ReentrantLock的源码入口是哪里呢?结合上面的类图,我们可以看到ReentrantLock继承自Lock接口。那么Lock接口中的方法就是ReentrantLock实现类中的核心方法,我们就以Lock中的几个核心方法为入口来进行分析。

sync.lock()方法

顾名思义,lock方法的作用是加锁。那么在ReentrantLock中具体是如何实现的?我们进入ReentrantLock中的lock方法:

/**
 * Acquires the lock.
 *
 * <p>Acquires the lock if it is not held by another thread and returns
 * immediately, setting the lock hold count to one.
 *
 * <p>If the current thread already holds the lock then the hold
 * count is incremented by one and the method returns immediately.
 *
 * <p>If the lock is held by another thread then the
 * current thread becomes disabled for thread scheduling
 * purposes and lies dormant until the lock has been acquired,
 * at which time the lock hold count is set to one.
 */
public void lock() {
    sync.lock();
}

可以看到,在ReentrantLock中调用了sync.lock()方法。那么sync是什么?它的实现的lock方法又做了那些事情呢:

首先可以知道,sync是一个ReentrantLock内部的同步器。具体分为两种:FairSync【公平锁】和NonfairSync【非公平锁】。

/** Synchronizer providing all implementation mechanics */
private final Sync sync;
 /**
  * Sync object for fair locks
  */
static final class FairSync extends Sync {...}
 /**
  * Sync object for fair locks
  */
static final class NonfairSync extends Sync {...}

那么,又是在那里选择我们要用何种锁的呢?我们看下ReentrantLock的构造函数:

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

其中,ReentrantLock提供了带有选择的构造函数。默认实现非公平锁(因为非公平锁效率高)。True为使用公平锁,False为使用非公平锁。后面,我们先以默认的非公平锁的模式解读下ReentrantLock的源码,之后再看公平锁的实现跟非公平锁的实现有哪些区别。

AbstractQueuedSynchronizer

在进入ReentrantLock源码学习之前,我们还应该关注下ReentrantLock继承的抽象类AQS抽象阻塞队列内部的一些东西。

1、state变量

/**
 * The synchronization state.
 */
private volatile int state;

该变量是所有继承自AQS的核心变量,其作用是判断当前锁对象是否被某个线程持有。当state=0时表示锁未被持有;当state=1时表示锁被持有,其他线程无法获取锁对象(独占性的特性)。并且,state变量由volatile关键字修饰,那么就表示这个变量是一个对所有线程可见的一个共享资源。

2、Node内部类

static final class Node {
    /** 表示该节点在共享模式下等待 */
    static final Node SHARED = new Node();
    /** 表示该节点在独占模式下等待 */
    static final Node EXCLUSIVE = null;

    /** waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞 */
    static final int CANCELLED =  1;
    /** waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程 */
    static final int SIGNAL    = -1;
    /** waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)*/
    static final int CONDITION = -2;
    /**
     * waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态 
     */
    static final int PROPAGATE = -3;

   /**
     * Node节点的状态
     */
    volatile int waitStatus;

   /**
     * 当前Node的前驱节点
     */
    volatile Node prev;

   /**
     * 当前Node的后置节点
     */
    volatile Node next;

   /**
     * Node节点中保存的线程
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node表示节点对象,当有线程抢占锁对象失败的时候,失败的线程会被挂起并且转变为一个Node节点加入一个FIFO双向链表中以等待持有锁的线程释放锁对象后来唤醒FIFO中的Node节点中保存的线程来重新开始争抢锁对象。关键的一些信息,源码中已经注明注释意义。

非公平锁的实现:

lock方法:
/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
 /**
	* Atomically sets synchronization state to the given updated
	* value if the current state value equals the expected value.
	* This operation has memory semantics of a {@code volatile} read
	* and write.
	*
	* @param expect the expected value
	* @param update the new value
	* @return {@code true} if successful. False return indicates that the actual
	*         value was not equal to the expected value.
	*/
	protected final boolean compareAndSetState(int expect, int update) {
		// See below for intrinsics setup to support this
		return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 }
/**
  * Sets the thread that currently owns exclusive access.
  * A {@code null} argument indicates that no thread owns access.
  * This method does not otherwise impose any synchronization or
  * {@code volatile} field accesses.
  * @param thread the owner thread
  */
  protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

在非公平的lock方法中,我们可以看到其执行的流程:

1、通过CAS操作修改state的值。如果是0,则修改为1。

2、若state=0,则修改成功。通过setExclusiveOwnerThread(thread)方法将当前锁的持有线程设置为当前线程。

3、若state=1,则修改失败。表明当前锁已经被持有,由于独占性的原因。执行acquire(1)方法。

acquire方法

我们看下acquire(1)方法做了内些事情:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

当tryAcquire方法返回true时,将直接执行selfInterrupt()方法来自我中断并恢复当前线程。

执行tryAcquire(1)方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在tryAcquire(1)方法中,执行nonfairTryAcquire(1)方法。其中主要做的事情是:

1、再次尝试获取锁的state值,若为0,则当前线程持有锁。并设置锁的持有线程为当前线程。

2、如果state=1,则判断锁的持有线程是否为当前线程(重入性的体现)。如果是当前线程持有锁对象,那么代表此时锁被重入了。那么重入次数+1,更新重入次数state值,判断重入次数是否小于0(小于0代表出错了,抛出Error提示)。

3、如果当前锁没有被持有,或者当前锁被重入,都返回true;否则表示当前线程彻底获取锁失败,返回false。

addWaiter方法
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

当执行tryAcquire方法再次尝试获取锁失败的时候,将执行addWaiter方法。addWaiter的主要作用是:

1、设置当前节点为一个Node.EXCLUSIVE节点。

2、获取当前FIFO中的尾结点为前驱节点进行判断:当尾结点为空时,将当前节点执行enq(node)方法加入FIFO阻塞队列中;若尾结点不为空,说明FIFO中还有其他的阻塞节点。那么,将当前节点插入到当前FIFO中的尾结点之后。

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq(node)方法内部主要是一个死循环,用来保证FIFO队列的初始化得以执行。

1、当初始化时,新建一个Node节点放入FIFO中作为头节点。

2、当再次循环时,FIFO中不为空了,就执行else分支,将当前节点插入初始化的头节点后。

acquireQeued方法

当争抢锁失败的线程被转换为Node节点后,下来就执行acquireQeued方法来具体操作FIFO中的节点了。

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQeued方法中,主要做了一些事情:
1、仍然是一个死循环,循环内部首先获取当前节点的前驱节点判断是否为头节点。如果是头节点,那么再次尝试获取锁对象执行tryAcquire方法。

2、如果头节点获取锁成功,那么就表示当前节点可以作为头节点加入FIFO中。并设置p.next为空,以便于GC回收。设置获取锁失败的标志failed为false;返回线程中断标记为false,那么前驱节点的线程获取了锁,就可以通过自我中断来复位,继续执行逻辑了。

3、如果当前节点的前驱节点不是头节点,那么就调用park方法挂起自己。返回中断标记为true。

4、最后不论结果如何,都会判断是否当前线程获取锁失败。是的话,就取消当前线程的尝试获取锁的这一个操作。

/**
 * Cancels an ongoing attempt to acquire.
 *
 * @param node the node
 */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

至此,非公平锁的lock方法已经全部解析完毕,下面我梳理了一份流程图用于图文结合的理解。图中主要画出了核心流程,至于Node节点的状态变化可以自行参考源码和AQS中的部分关键字的描述理解。

流程图总结

并发编程--ReentrantLock

unlock方法

上面说完了ReentrantLock中的lock方法,那么下面就来一起看下ReentrantLock的释放锁的unlock方法的源码。

/**
 * Attempts to release this lock.
 *
 * <p>If the current thread is the holder of this lock then the hold
 * count is decremented.  If the hold count is now zero then the lock
 * is released.  If the current thread is not the holder of this
 * lock then {@link IllegalMonitorStateException} is thrown.
 *
 * @throws IllegalMonitorStateException if the current thread does not
 *         hold this lock
 */
public void unlock() {
    sync.release(1);
}

在ReentrantLock中,unlock放的具体实现是调用了sync的子类的release方法具体去实现的。我们一起看下release方法都做了什么事情。

release方法
/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release方法的执行流程如下:

1、调用tryRelease方法尝试释放锁。

2、释放失败返回false。

3、释放成功,则获取FIFO中的头节点。如果头节点不是空,并且状态不是0,那么就唤醒FIFO中的之前挂起的头节点中的线程使之再去争夺锁,并返回true。

那么,tryRelease方法看来是release方法的核心,那么它又做了那些事情?

tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

tryRelease方法流程总结如下:

1、获取state变量值并-1,代表锁释放了一次。

2、判断当前线程是否为锁的持有线程,不是的话抛出IllegalMonitorStateException异常。

3、判断state的值是否已经减为0,是的话释放标志free标志为true,并清空锁的持有线程。

4、否则说明当前线程被冲入了,需要持续释放直到state==0,返回释放失败标志false。

流程图总结

并发编程--ReentrantLock

公平锁的实现

提问

我们学习完了ReentrantLock的非公平锁的源码。我们先思考一个问题:

Q:当非公平锁的实现unlock方法执行的时候,我们从FIFO中唤醒了头节点中被挂起的线程去重新争抢锁。那么有一个问题来了,此时外部又来了一批新的请求线程来获取锁,那么此时被唤醒的头节点线程和这些线程一起争抢锁,就一定不能保证谁拿到锁。那么对于刚被唤醒的头节点的线程来说,这岂不是太不公平了?

A:是的,这就是非公平性的体现,释放了你,并不一定你拿到锁。锁的获取不会遵循先来后到的规则。那么,公平锁的实现和原理我们就有了一个大概的了解 – 按照请求的顺序从FIFO中按次序来获取锁。同样可以理解,为什么ReentrantLock使用非公平锁来作为默认实现了。因为,公平锁效率太差了,要排队获取。

对于公平锁有了认知,那么我就直接说明那里的改动实现了公平性的特点。因为大部分代码都是一样的,所以可以参考非公平的实现。

lock方法
final void lock() {
    acquire(1);
}

看了源码,我们就有疑问了:公平锁的lock方法为什么比非公平少了CAS尝试获取锁?

很好理解,因为实现公平锁的话,不存在并发抢锁的问题。因为,这时是严格按照FIFO中的次序来获取锁的。

tryAcquire方法

下来,我们看下acquire中的执行方法tryAcquire对于公平锁是如何实现的?

/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

这里,我们发现,相对于非公平的实现,这里当state==0的分支里,多了一个判断hasQueuedPredecessors()方法的执行。

hasQueuedPredecessors方法
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedPredecessors方法主要是判断,当前FIFO中是否有比当前线程更早的节点。如果有,则将当前线程加入FIFO的尾部并挂起当前线程。

unlock方法
/**
 * Releases in exclusive mode.  Implemented by unblocking one or
 * more threads if {@link #tryRelease} returns true.
 * This method can be used to implement method {@link Lock#unlock}.
 *
 * @param arg the release argument.  This value is conveyed to
 *        {@link #tryRelease} but is otherwise uninterpreted and
 *        can represent anything you like.
 * @return the value returned from {@link #tryRelease}
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

不再赘述,同非公平的实现,判断FIFO中的头节点是否满足唤醒条件并唤醒去争夺锁。

思考

我们思考一个问题,在lock方法中,为什么要重复的循环?重复的尝试获取锁?而释放锁里却没有这些操作?

其实,这个就跟并发有关系了。在获取锁时,我们并没有对于lock方法加锁,也就是说执行lock方法的时候是会有并发的问题的。那么,不断的循环,不断的通过CAS尝试获取锁就是为了解决并发场景下那个线程获得锁这一不确定的问题。

然而,释放锁的时候,由于锁已经被当前线程获取了,不会存在并发的问题。所以,释放锁就可以视为单线程场景了。