欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ReentrantLock源码解析(二)

程序员文章站 2022-05-05 09:59:43
...

1 数据结构

ReentrantLock是可重入锁,又分为公平锁和非公平锁。类图如下:
ReentrantLock源码解析(二)

1.1 AQS源码解析

https://blog.csdn.net/qq_34125999/article/details/105343472

1.2 Sync

/**
* ReentrantLock 基础结构
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
    
    private static final long serialVersionUID = -5179523762034025860L;
    
    /**
     * 抽象方法,加锁方式,由公平锁、非公平锁实现
     */
    abstract void lock();
    
    /**
     * 非公平锁获取资源
     */
    final boolean nonfairTryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //获取同步状态
        int c = getState();
        //如果状态为0,表示没有线程拥有资源
        if (c == 0) {
            //cas修改同步状态,修改成功设置获取资源的线程,返回抢占资源成功
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果当前线程就是持有资源的线程
        else if (current == getExclusiveOwnerThread()) {
            //同步状态增加acquires
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //更新同步状态,返回抢占资源成功
            setState(nextc);
            return true;
        }
        //返回抢占资源失败
        return false;
    }
    
    /**
      * 释放资源
      */
    protected final boolean tryRelease(int releases) {
        //获取当前同步状态,减去释放的资源
        int c = getState() - releases;
        //判断释放资源的线程是否是当前线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        //资源标记
        boolean free = false;
        //释放资源后,同步状态为0,那么释放成功,设置占有资源的线程为null
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //更新同步状态
        setState(c);
        //返回资源标记
        return free;
    }
    
    
    //判断当前线程是否是占有资源的线程
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    //创建condition
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    //获取占有资源的线程
    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }
    //获取同步状态
    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }
    //判断是否可以抢占资源
    final boolean isLocked() {
        return getState() != 0;
    }
    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

1.3 NonfairSync

/**
 * 非公平锁
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    /**
     * 加锁
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
        
    /**
     * 尝试获取资源
     */    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

1.4 FairSync

/**
* 公平锁
*/
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    
    /**
     * 加锁
     */
    final void lock() {
        acquire(1);
    }
    /**
     * 尝试获取资源
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

}

2 非公平锁加锁

2.1 NonfairSync lock

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * 加锁
     */
    final void lock() {
        //cas抢资源,如果同步状态是0,当前线程抢到资源,那么就不用入队,直接干活
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //获取一个资源
            acquire(1);
    }
    
    /**
     * 尝试获取资源
     */
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

2.2 AbstractQueuedSynchronizer acquire

public final void acquire(int arg) {
    //尝试获取资源、加入队尾巴、进入等待
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

2.3 tryAcquire

(1) NonfairSync tryAcquire

protected final boolean tryAcquire(int acquires) {
   //非公平锁获取资源  
   return nonfairTryAcquire(acquires);
}

(2) Sync nonfairTryAcquire

final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取同步状态
    int c = getState();
    //当前状态为0,代表没有线程拥有资源
    if (c == 0) {
        //cas当前线程抢占资源,抢占成功,设置抢到资源的线程,返回抢占资源成功
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果当前线程是拥有资源的线程
    else if (current == getExclusiveOwnerThread()) {
        //同步状态增加资源量,更新状态,返回抢占资源成功
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //抢占资源失败
    return false;
}

2.4 AQS addWaiter(Node.EXCLUSIVE)

(1) addWaiter

/**
 * 添加等待者
 */
private Node addWaiter(Node mode) {
    //创建一个独占节点
    Node node = new Node(Thread.currentThread(), mode);
    //获取AQS队尾
    Node pred = tail;
    //如果队尾存在
    if (pred != null) {
        //当前节点的前指针指向队尾
        node.prev = pred;
        //cas快速更新队尾,(expect:队尾,update:当前节点).更新成功,更新以前队尾的后指针
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //自旋更新队尾
    enq(node);
    //返回当前节点
    return node;
}

(2) AQS enq(node)

/**
 * 自旋向CLH队列队尾插入数据
 */
private Node enq(final Node node) {
    //自旋
    for (;;) {
        //获取队尾
        Node t = tail;
        //如果队尾为空,创建CLH队列的队头、队尾
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //当前节点的前指针指向队尾
            node.prev = t;
            //cas设置队尾,设置成功后更新队尾的后指针,返回当前节点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

2.5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

(1) AQS acquireQueued

/**
 * 自旋,等待 或 成功抢占资源返回
 */
final boolean acquireQueued(final Node node, int arg) {
    //失败标记
    boolean failed = true;
    try {
        //打断标记
        boolean interrupted = false;
        //自旋
        for (;;) {
            //获取当前节点的前驱节点
            final Node p = node.predecessor();
            //如果当前节点是CLH队列第二个节点,并且抢占了资源
            if (p == head && tryAcquire(arg)) {
                //更新头节节点
                setHead(node);
                //原头节点后指针清空
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //判断是否进入等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

(2) AQS shouldParkAfterFailedAcquire

/**
* 是否进入等待
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获得当前节点前驱节点的状态
    int ws = pred.waitStatus;
    
    //如果前节点是SIGNAL状态,进入等待状态,返回true
    if (ws == Node.SIGNAL)
        return true;
    //如果前驱点解的状态是(CANCELLED),寻找有效的前驱节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    }
    //cas把前驱节点设置为SIGNAL状态
    else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    //返回false,重新进入自旋
    return false;
}

(3) AQS parkAndCheckInterrupt

/**
* 调用LockSupport进行等待(LockSupport调用的是unsafe)
*/
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

2.6 时序图

ReentrantLock源码解析(二)

3 公平锁加锁

/**
* 流程和非公平锁差不多,当一个新线程进来非公平锁有抢资源操作,如果抢到资源那么不用进入CLH队列,非公平锁必须入CLH队列
* 所以非公平锁比公平锁快
*/
final void lock() {
    acquire(1);
}

4 解锁

4.1 ReentrantLock unlock

public void unlock() {
    sync.release(1);
}

4.2 AQS release

/**
 * 释放资源
 */
public final boolean release(int arg) {
    //尝试释放资源
    if (tryRelease(arg)) {
        //获取头指针
        Node h = head;
        //如果头指针不为空,状态是非CANCELLED状态,唤醒后续有效线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

(1) AQS tryRelease

protected final boolean tryRelease(int releases) {
    //计算当前同步状态
    int c = getState() - releases;
    //如果释放资源线程不是拥有资源的线程则抛出错误
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    //空闲资源标记
    boolean free = false;
    //如果同步状态为0,表示当前没有线程拥有当前资源,空闲标记为true
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //更新同步状态
    setState(c);
    return free;
}

(2) AQS unparkSuccessor

/**
*唤醒等待线程
*/
private void unparkSuccessor(Node node) {
    /*
     * 当前头节点的状态
     */
    int ws = node.waitStatus;
    
    //如果头节点的状态不是初始化状态,那么cas设置头节点的状态为初始化状态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * 获取CLH队列第二个节点
     */
    Node s = node.next;
    
    //如果不存在第二个节点或者第二个节点为CANCELLED状态
    if (s == null || s.waitStatus > 0) {
        //s指向null
        s = null;
        //从尾部找有效节点并且赋值给s
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果s指向节点不为null,那么唤醒s节点的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}