欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并发编程之可重入锁ReentrantLock

程序员文章站 2022-07-13 17:03:06
...

ReentrantLock简介

ReentrantLock是一种可重入的独占锁。ReentrantLock构造方法:

    //默认构建非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    //传入公平参数,构建公平锁/非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

从构造方法可知,ReentrantLock支持公平锁FairSync和非公平锁NonfairSync。默认情况下,创建ReentrantLock实例会得到一个非公平锁。其锁的功能由内部类Sync完成,Sync扩展自AQS,并依赖AQS的基础行为。Sync的抽象方法lock()由子类FairSync和NonfairSync各自实现。先以NonfairSync为例介绍非公平锁的加锁和解锁原理。

非公平锁

NonfairSync是ReentrantLock的非公平锁实现,它主要实现Sync中定义的抽象方法lock()和AQS中未实现的抽象方法tryAcquire。

加锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        //执行锁定
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        //尝试获取锁
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

加锁时,首先通过CAS操作将状态从0修改为1,如果修改成功,则修改锁的持有者为当前线程,加锁成功。

什么是CAS?这里简单说明下:

CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则返回V

如果修改状态失败,说明锁的状态不为0,仍被其他线程持有。调用acquire(1),acquire在AQS中提供了默认实现:

   public final void acquire(int arg) {
       if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
   }

调用tryAcquire()尝试获取锁,tryAcquire()调用nonfairTryAcquire方法,进入源码:

    final boolean nonfairTryAcquire(int acquires) {
         final Thread current = Thread.currentThread();
         int c = getState();
         //如果锁状态为0,则尝试锁定,成功返回true
         //这里是为了在并发条件下增加获取锁的可能性;若刚好之前持有锁的线程已经释放锁,则当前线程可再次尝试
         if (c == 0) {
             if (compareAndSetState(0, acquires)) {
                 setExclusiveOwnerThread(current);
                 return true;
             }
         }//否则,如果当前线程为锁的持有者,则state+acquires,这里即体现了ReentrantLock的可重入性
         else if (current == getExclusiveOwnerThread()) {
             int nextc = c + acquires;
             if (nextc < 0) // overflow
                 throw new Error("Maximum lock count exceeded");
             setState(nextc);
             return true;
         }
         return false;
    }

如果此次尝试成功获取锁,直接返回。否则获取锁失败,将当前线程加入等待队列。ReentrantLock是可重入的独占锁,因此以独占模式在链表中添加等待节点。

    private Node addWaiter(Node mode) {
        //创建当前结点
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //tail节点不空,则使用CAS操作将node添加到链表尾部,成功则返回;失败则进入enq
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //tail节点为空,调用enq初始化链表
        //或节点加入链表尾部CAS操作失败,说明当前存在竞争
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        //由于这里存在多线程并发问题,使用自旋保证node能够添加到链表中,因此enq本身是线程安全的
        for (;;) {
            Node t = tail;
            //再次判断tail指向的节点是否为空,之所以再次判断,可能有其他线程已经初始化链表
            if (t == null) { 
                //多线程并发时,只有一个线程执行compareAndSetHead返回成功,执行失败的线程将进入else代码块
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //这里同样存在多线程并发,一次只会有一个线程成功,失败的线程将进入下一次循环,直至成功
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter保证将节点加入等待队列。若队列非空,首先会尝试一次将节点插入队列,若成功则无需进入自旋代码块。通常情况下,没有竞争时,无需自旋即可完成。进入自旋通常是因为CAS操作竞争比较激烈。当前节点加入等待队列之后,进入acquireQueued等待挂起。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取当前结点的前驱节点
                final Node p = node.predecessor();
                //前驱节点是head,表示当前节点之前已无节点在等待锁,再次尝试获取锁
                //这里不会和等待队列中的其他线程发生锁竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功,则设置node为新的head
                    setHead(node);
                    //释放节点,帮助GC
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                //前驱节点不是head
                //parkAndCheckInterrupt会挂起当前线程,直到调用release收到唤醒信号
                if (shouldParkAfterFailedAcquire(p, node)&&parkAndCheckInterrupt())
                    interrupted = true;//如果线程被中断,则将interrupted设为true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    //将node设为头部节点,并将thread和prev置空
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

如果当前节点的前驱节点为头节点,当前线程会再次尝试获取锁。如果再次尝试失败或当前节点的前驱节点不是头节点,调用shouldParkAfterFailedAcquire判断是否应当挂起当前线程,看实现:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //表示pred的后继节点node可以安全的挂起
        if (ws == Node.SIGNAL)
            return true;
        //表示pred处于取消状态
        if (ws > 0) {
            
            do {
                //从链表中移除pred,一直循环直到pred前面的节点不是取消状态
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //修改next指针
            pred.next = node;
        } else {
            //ws为0或者PROPAGATE,将pred状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //阻塞当前线程,并返回线程的中断情况
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }
    private static void setBlocker(Thread t, Object arg) {
        unsafe.putObject(t, parkBlockerOffset, arg);
    }

只有当前节点的前驱节点的状态为SIGNAL时,线程可安全挂起。其他情况线程会再次进入自旋。

总体看来,shouldParkAfterFailedAcquire就是靠前驱节点判断当前线程是否应该被挂起,如果前驱节点为SIGNAL,则挂起当前线程;如果前继节点处于CANCELLED状态,移除取消节点,并更新当前节点pred指针;前驱节点状态为0(默认进入队列即为0)或PROPAGATE,设前驱节点的状态为SIGNAL,当前线程再次进入自旋,还是未获取锁就会被安全挂起。acquireQueued是一个自旋操作,线程在此挂起,直到被唤醒,满足p == head,执行tryAcquire(arg)尝试获取锁,成功则将当前节点设为新的head,该操作非常重要(想想为什么,后面解释)。

解锁

解锁操作实际调用AQS中的release方法:

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //head不空且waitStatus不为0,唤醒后继节点;waitStatus为0则说明无线程在等待队列中。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release首先调用tryRelease尝试释放锁。

      protected final boolean tryRelease(int releases) {
            //如果线程被锁定多次,则需要进行多次释放
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

调用tryRelease返回true,说明锁已释放,判断head节点,若还有后继节点,调用unparkSuccessor进行唤醒。执行唤醒操作前,首先将当前节点(这里是head)的waitStatus置为0,再判断后继节点是否为取消状态,若为取消状态,需要使用pred指针从tail向前遍历,找到最靠近head的非CANCEL节点,使用LockSupport进行唤醒。

    private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//s为空或node处于取消状态
            s = null;
            //从链表尾部向前遍历,直到找到位于链表最前端且waitStatus小于0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//唤醒线程
    }

被唤醒的线程将继续在acquireQueued的自旋操作中进行锁竞争,直到成功获取锁。

上面提到acquireQueued中,线程(假设为线程B)获取锁之后,会调用setHead将当前节点设为新的head。原因是,当持有锁的线程(假设为线程A)释放锁之后,是否需要唤醒后继节点是根据head的waitStatus是否为0来判定,当调用unparkSuccessor进行唤醒时,又会将head的waitStatus置为0。假设线程B之后仍然有线程C在阻塞,线程A唤醒B后将head的waitStatus置为0,若不执行setHead,线程B释放锁时,看到head的waitStatus为0,则将不会唤醒线程C,线程C及后面的节点将一直阻塞。当执行setHead之后,若线程B之后仍然有线程在阻塞,则新的head节点waitStatus必然是SIGNAL,唤醒操作将有序进行。好精妙,佩服!!!

公平锁

FairSync是ReentrantLock的公平锁实现,公平锁的加锁原理:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() {
            acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

由FairSync源码可知,加锁方法直接调用acquire,然后调用tryAcquire方法。与NonfairSync最大的区别就在于tryAcquire实现不同,首先调用hasQueuedPredecessors确定队列中没有等待线程,或者队列中第一个等待者为当前线程,FairSync才会尝试获取锁。否则,直接进入等待队列。

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        //h != t,说明队列不空
        //(s = h.next) == null,队列中无排队线程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

 

公平锁实现完全遵循FIFO的原则,在进入等待队列之前不会存在锁竞争的问题。即所有线程按公平原则获取锁,如果锁未被占用且队列为空,获取锁;如果队列为空,自动去排队,不去争抢锁。通常情况下,公平锁的效率不如非公平锁。

相关标签: Java