欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

解读可重入锁——ReentrantLock&AQS,java8

程序员文章站 2022-05-04 17:53:00
...

一 , ReentrantLock简介

  首先我们可以查看ReentrantLock类,实现了Lock接口,其内部类都直接或者间接的继承了AQS类,则AQS肯定与ReentrantLock关系密切
解读可重入锁——ReentrantLock&AQS,java8

我们来个demo,让大家更直接的了解重入锁:

解读可重入锁——ReentrantLock&AQS,java8

二, 构造器

MyReentrantLock是为了写自己理解的注释,改了一下类名

    /**
     * ReentrantLock默认选择的就是不公平锁
     */
    public MyReentrantLock() {
        sync = new MyReentrantLock.NonfairSync();
    }

    /**
     * 给我正义的数,我就是公平锁,不然还是不公平锁
     * @param fair true 公平锁 false非公平锁
     */
    public MyReentrantLock(boolean fair) {
        sync = fair ? new MyReentrantLock.FairSync() : new 
        MyReentrantLock.NonfairSync();
    }

三,lock 方法

lock方法封装在Syn抽象类中
解读可重入锁——ReentrantLock&AQS,java8

公平锁和非公平锁类,都为lock做了重写
解读可重入锁——ReentrantLock&AQS,java8

3 .1 公平锁的 lock

lock方法中就一个获取锁的方法,还是借用AQS去获取方法
解读可重入锁——ReentrantLock&AQS,java8

3.1.1 AQS的 acquire 实现

  首先尝试获取锁,然后再考虑要不要进入阻塞队列

    /**
     *  首先调用该方法的线程,去尝试获取锁资源
     *  如果获取到锁,给个标记,当前执行的线程就是我的线程,state+1
     *  如果没有获取到锁,该线程写入阻塞队列(面壁思过,等下再来叫你)
     *
     *  怎样才能成功获取锁呢?
     *      1,当前锁空闲,且同一时刻也没有竞争
     *      2,这个锁本来就是由当前线程持有,就是重入
     *
     * @param arg 调用方写死的量为1
     */
    public final void acquire(int arg) {

        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(MyAbstractQueueSync.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

3.1.2 公平锁尝试获取锁 tryAcquire尝试获取锁,是公平锁自己实现

         /**
         * 尝试直接获取锁
         * @param acquires
         * @return true 获取到锁,flase,没有获取到锁
         */
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程引用
            final Thread current = Thread.currentThread();

            //获取当前线程锁的状态
            //第一个获取到锁 c == 0
            //第i次重入 c == i
            int c = getState();
            if (c == 0) {//第一次尝试直接获取锁

                //由于是公平锁,要讲究先来后到
                //首先判断阻塞队列中有没有排在它前面的线程
                // 因为有可能是上一个持有锁的线程刚刚释放锁, 队列里的线程还没来得及争抢, 本线程就乱入了
                //
                //判断阻塞队列中没有其它线程
                //compareAndSetState(0, acquires)
                // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
                // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了(到嘴的鸭子飞了)
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {

                    //获取到了锁,标记,,告诉大家,当前线程获取到锁了
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }

            //这个锁被人占了,确认一下是不是自己现在占着这个锁
            //是自己占着这个锁,且c!=0了,也就是重入了
            else if (current == getExclusiveOwnerThread()) {

                //acquires = 1 c>=1
                int nextc = c + acquires;
                if (nextc < 0)//这个线程占有这个锁的次数太多了,使int溢出了,抛出异常
                    throw new Error("Maximum lock count exceeded");
                //更新锁的状态为2,就是告诉别人,它获取到这个锁2次了
                //state为几就是这个线程获取到这个锁几次
                setState(nextc);
                return true;
            }
            return false;
        }
    }

3 .2 非公平锁的 lock

  非公平锁,就是很流氓的,想获取锁的时候就直接去获取,不排队。但是直接获取不到,又像其它人一样老实的遵守步骤了。

    /**
    * 试着直接执行获取锁,失败了之后就老实的去尝试获取锁
    */
    final void lock() {
        if (compareAndSetState(0, 1))//直接尝试cas操作获取锁,且获取成功了

           //获取到了锁,标记,,告诉大家,当前线程获取到锁了
           setExclusiveOwnerThread(Thread.currentThread());
        else
           acquire(1);
    }

四, 将线程加入阻塞队列

  将线程加入阻塞队列,我们得知道该阻塞队列结构是什么样的
  AQS的Node类的属性有:
  

        //线程的等待状态,执行的时候,状态为-1,等待时状态为0
        volatile int waitStatus;

        //上一个发起过获取锁的线程
        volatile MyAbstractQueueSync.Node prev;

        //下一个发起过获取锁的线程,,或者是空
        volatile MyAbstractQueueSync.Node next;

        //发起获取锁的线程
        volatile Thread thread;

        //下个等待线程
        MyAbstractQueueSync.Node nextWaiter;

  队列中全部都是等待的线程
解读可重入锁——ReentrantLock&AQS,java8

  如果头线程开始执行了:
    等待状态发生改变,切该线程有了得到锁的标识,head指向队列中的持有锁线程,tail指向队列尾。释放锁之后,等待状态又回到0。这样依次类推。
  解读可重入锁——ReentrantLock&AQS,java8

acquireQueued(addWaiter(MyAbstractQueueSync.Node.EXCLUSIVE), arg)
在尝试获取锁失败的时候,这句话就是将线程加入阻塞队列的调用

我们先看addWaiter方法:

    /**
     * 在队列中添加节点
     * @param mode
     * @return 添加的节点
     */
    private MyAbstractQueueSync.Node addWaiter(MyAbstractQueueSync.Node mode) {
        MyAbstractQueueSync.Node node = new MyAbstractQueueSync.Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //获取队尾的引用
        MyAbstractQueueSync.Node pred = tail;
        //如果阻塞队列不为空,队尾有指向的引用
        if (pred != null) {
            node.prev = pred;
            //在队列的尾部插入线程
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        //初始化队列,如果队列是空,有队列需要加入的时候
        //就 需要初始化队列,如果使用不上队列的时候,
        // 就不需要引入队列
        enq(node);
        return node;
    }

在看初始化以及争抢入队方法 enq:

    /**
     * 队列初始化
     * cas设置tail,直到争抢成功
     * @param node
     * @return
     */
    private MyAbstractQueueSync.Node enq(final MyAbstractQueueSync.Node node) {
        for (;;) {
            MyAbstractQueueSync.Node t = tail;
            //最开始的时候,队列为空,必定进入if
            if (t == null) { // Must initialize
                //cas来设置head和tail
                if (compareAndSetHead(new MyAbstractQueueSync.Node()))
                    tail = head;
            } else {
                // 争抢入队, 没抢到就继续for循环迭代.抢成功了就可以return了,不然一直循环.
                // 为什么是用cas来争抢呢? 因为怕是多个线程一起执行到这里
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

然后在回到 acquireQueued 方法:

     /**
     * 把node入队
     * @param node 刚刚插入队尾的节点
     * @param arg
     * @return
     */
    final boolean acquireQueued(final MyAbstractQueueSync.Node node, int arg) {
        //设置入队是否获取锁成功标志
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取节点的前驱
                final MyAbstractQueueSync.Node p = node.predecessor();
               //如果node前驱节点恰好是head
                //那么就可以在次尝试获取一次锁
                //队首节点很乐观,因为确实很可能马上轮到他来获取锁
                if (p == head && tryAcquire(arg)) {
                    //很幸运的是,它居然再一次的尝试成功获取了锁
                    //那head自然就要指到它的头上
                    setHead(node);
                    //那个刚刚那个线程,我帮你继续使用锁
                    //我也继续帮你GC(好人做到底)
                    p.next = null; // help GC
                    //肯定是获取成功了啊
                    failed = false;

                    //全村的希望,让大家终于摆脱死循环
                    return interrupted;
                }
                //不是head的后一个,或者是head的后一个,但是没有获取到锁,
                // 就到了这个if
                //获取锁失败了,是否要阻塞这个线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                        //在这阻塞,等待唤醒
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //发现之前哪里出现了异常,就执行
            if (failed)
                cancelAcquire(node);
        }
    }

我们在一层一层的扒下去,看看 shouldParkAfterFailedAcquire 是如果判断线程是否需要阻塞:

注意:我之前在图示队列结构的时候,表述了执行的线程,waitStatus为-1,但是在这个情况,阻塞的线程waitStatus也是-1,要分请waitStatus为-1时,线程的状态

     /**
     * 是否要阻塞该线程
     * @param pred node的前驱
     * @param node node
     * @return
     */
    private static boolean shouldParkAfterFailedAcquire(MyAbstractQueueSync.Node pred, MyAbstractQueueSync.Node node) {
        //获取前驱的等待状态
        int ws = pred.waitStatus;

        //如果线程前驱的等待状态为-1,即在执行状态,那么就阻塞node
        if (ws == MyAbstractQueueSync.Node.SIGNAL)
            return true;
        //如果线程前驱的等待状态大于0,即是1 CANCELLED 就把前面的等待状态为1的删了
        //删到直到不为1
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //ws只有1,0,-1,-2,-3
            //1,-1处理了,前面没有对waitStatus做操作
            //那么只剩0了,前驱的节点的ws设置为-1
            //在之前图表示的时候,为-1是执行状态,但这种状态是阻塞状态
            // 直到前一个线程释放了锁,才能使执行状态
            compareAndSetWaitStatus(pred, ws, MyAbstractQueueSync.Node.SIGNAL);
        }
        return false;
    }

在 parkAndCheckInterrupt 中:

   LockSupport.park(this)会挂起当前线程. 但是LockSupport.park
 还有一个隐藏功能. 就是, 如果先对一个线程unpark, 再对这个线程park,
  那么这次的park是失效的. 下一次park才会挂起.

  原因就是, 对一个没有被park的线程进行unpark的时候, 会把标志位
perm置为1. 而每次park的操作, 都是先去检查perm是否为1.

  如果是1, 那么置为0, 并且这次不挂起.

  如果perm为0, 那么就直接挂起这个线程.
     *如果线程被阻塞过,返回true
     * @return
     */
    private final boolean parkAndCheckInterrupt() {
        /**
         * 挂起当前线程
         */
        LockSupport.park(this);
        return Thread.interrupted();
    }

五,释放锁 unlock

  释放锁是调用Sync的release

    public void unlock() {
        sync.release(1);
    }

5.1 释放锁 release

    /**
     *  释放锁
     *  记录锁状态的计数器-1
     *  状态为0,就是彻底释放
     * @param arg
     * @return true 释放成功,false,释放失败
     */
    public final boolean release(int arg) {

        //首先尝试释放锁
        if (tryRelease(arg)) {
            //只有一层lock unlock结构,释放锁成功后,
            // 或者独占锁完全释放
            //其实就是这个线程把所有的lock都unlock了

            MyAbstractQueueSync.Node h = head;

            // 唤醒阻塞队列中的下一个等待的线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

5.1.1 尝试释放锁 tryRelease

  尝试释放锁是去掉一层lock unlock,如果是最后一层,给个标记,并且唤醒阻塞队列的下一个等待线程

    /**
    * 公平锁和非公平锁尝试释放锁
    * @param releases
    * @return
    */
    protected final boolean tryRelease(int releases) {
        //将线程持有锁的状态-1
        int c = getState() - releases;

        //当前线程是不是获取到锁的线程
        //是别的线程就抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        //用于标记是否可以完全释放锁
         boolean free = false;
        //如果状态是0,就可以完全释放锁了
        //是重入锁,这也是释放的最后的一把锁了
        //不是重入锁,就直接释放了
        if (c == 0) {
            //表示完全释放
            free = true;
            //设置持有该锁的线程没有
            setExclusiveOwnerThread(null);
        }
        //不是最后一层锁,就释放一层
        setState(c);
        return free;
    }

5.2.2 唤醒队列的挂起线程 unparkSuccessor

   /**
     * 唤醒node
     * @param node 这个node是head节点
     */
    private void unparkSuccessor(MyAbstractQueueSync.Node node) {
        //获取等待状态
        int ws = node.waitStatus;
        //如果ws<0,就将其变为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //获取head的后继节点
        MyAbstractQueueSync.Node s = node.next;
        //如果head的后继节点不存在,或者ws为1
        if (s == null || s.waitStatus > 0) {
            //head后继节点指向null
            s = null;
            for (MyAbstractQueueSync.Node t = tail; t != null && t != node; t = t.prev)
                //就从队列的后面往前面找,找到最前面一个ws<0的,但是又不是head的节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        //就把该节点唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

六,总结

  看完源码之后:

  1. Reentrant的实现和AQS密切分不开,使用到的阻塞队列,获取锁的方式都依赖于AQS
  2. 在开发时也可以视情况而选择锁,lock锁在适当的时候会更优于synchronized
相关标签: ReentrantLock AQS

上一篇: Java线程池简介

下一篇: java_线程池