欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ReentrantLock源码分析

程序员文章站 2022-05-04 20:13:00
...

ReentrantLock基本结构

ReentrantLock源码分析

ReentrantLock中有3个内部类,这个三个内部类的关系为
ReentrantLock源码分析
FairSync 是公平锁的主要实现类,NonfairSync 是非公平锁的主要实现类,默认为非公平锁的实现。

非公平锁加锁过程

非公平锁的加锁方法,加锁默认为非公平锁

ReentrantLock reentrantLock = new ReentrantLock();
        reentrantLock.lock();

1、构造方法中没有任何参数的时候,sync 默认为非公平锁的实现。
ReentrantLock源码分析

ReentrantLock源码分析
这是NonfairSync中实现lock方法

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
        //没有lock的状态是0 ,AQS将其设置为1 ,变成有锁状态,
        //调用父类中的的AbstractQueuedSynchronizer中的方法
            if (compareAndSetState(0, 1))
            //调用父类AbstractOwnableSynchronizer的方法,设置为独占锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
            //如何获取锁事变,则调用acquire 去加锁1次acquire逻辑(调用父类AbstractQueuedSynchronizer)
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }



/************************这里是NonfairSync的父类AbstractQueuedSynchronizer中的方法**************************/


    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;


  /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
     //这个是NonfairSync 调用  compareAndSetState,如果设置为加锁状态则为true,如果非加锁状态为false
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }


NonfairSync 中acquire逻辑(调用父类AbstractQueuedSynchronizer)

 public final void acquire(int arg) {
 //tryAcquire为本身子类NonfairSync 所重写, 也是让新来的线程进行第二次插队的机会
	//如果再次获取锁还不成功才会放到队列,放到队列里面acquireQueued()
        if (!tryAcquire(arg) &&
        //
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

//NonfairSync 重写的tryAcquire方法
   protected final boolean tryAcquire(int acquires) {
   //调用ReentrantLock 中 nonfairTryAcquire 方法
            return nonfairTryAcquire(acquires);
        }


 /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
         //ReentrantLock 中 nonfairTryAcquire 方法
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
         //c==0 代表目前没有线程获取有锁
            if (c == 0) {
               //如果没有线程获取锁,则当前线程CAS获取锁。并设置自己为当前锁的独占线程
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前锁就是该线程,这将设置状态+acquires,也就是加1,这也是lock允许重入锁的原因
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                    //设置当前锁的数量
                setState(nextc);
                return true;
            }
            //当前有锁或者获取锁失败,且不是当前线程 返回false
            return false;
        }

当前面nonfairTryAcquire第二次获取锁失败,进入acquire 方法中addWaiter 方法和 acquireQueued 方法

/** Marker to indicate a node is waiting in exclusive mode */
//Node.EXCLUSIVE 为null表示这是独占锁,如果为读写锁,那就是 共享模式(shared)。
        static final Node EXCLUSIVE = null;
//该 mode 为Node.EXCLUSIVE 
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //定义pred为指针指向tail
        Node pred = tail;
       // pred 尾指针不为null,即队列不为空
        if (pred != null) {
            node.prev = pred;
            //快速CAS将自己设为新的tail,如果tailOffset 没有改变,既pred还是队尾,没有node改变过他
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果队列为空, 则调用enq强制入队
//或者CAS设置失败,说明在其它线程入队节点争抢了tail,则此线也是调用enq强制入队
        enq(node);
        return node;
    }


  /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }


 /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
     //一直在for循环中一直设置,该节点为尾节点,直到成功为止
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            // 判断队列是否为空,空队列应该先初始化头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                设置当前节点为尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
 /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

当二次抢锁失败之后,将节点设置尾队列末尾之后,将进入acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            //之前步骤已经将该节点放置在末尾了
                final Node p = node.predecessor();
                //如果该节点为最开始有效节点(头结点为初始的节点),再进行一次获取锁
                if (p == head && tryAcquire(arg)) {
                //如果设置成功,将该节点设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //如果第一次循环就获取成功那么返回的interrupted是false,不需要自我中断。
	 		   //否则 说明在获取到同步状态之前经历过挂起(返回true)
                    return interrupted;
                }
                //走到这里说明前置节点不为head或者抢锁失败了
                //判断当前node线程是否可挂起,是,就调用parkAndCheckInterrupt挂起 ,i
                //nterrupted设置为true,表示曾经被中断过
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            //如果tryAcquire出现异常那么取消当前结点的获取(该tryAcquire为NonfairSync实现的)
                cancelAcquire(node);
        }
    }
/*******************node节点的等待状态*********************************/

 /** waitStatus value to indicate thread has cancelled */
 		//	由于在同步队列中等待线程等待超时或者被中断,需要从同步队列中取消等待
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //后继节点为等待状态,当且节点线程如果释放同步状态或者被取消
        //,将后继节点的线程得以运行
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        //节点在等待队列中,其他线程对condition调用signal(),该节点会从等待队列转移到同步队列中,胶乳到对同步状态的获取中
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
         //表示下一次桐乡式同状态会被传播下去
        static final int PROPAGATE = -3;
        

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
           
     // 如果前置结点是SIGNAL状态, 那么当前置结点执行完成后是可以唤醒后续结点的,
	 // 此时可以安全的挂起当前结点, 不需要进行不必要的for(;;),前置结点自然会通知。
  
            return true;
            //   如果ws>0说明前置结点的状态为1,是被自己取消获取同步的结点(只有线程本身可以取消自己)。
	  //那么do while循环一直往头结点方向找waitStatus < 0的节点;
	  //含义就是去除了FIFO队列中的已经自我取消申请同步状态的线程。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
                //一直找到为取消同步状态的线程
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
                //如果是其它状态waitStatus要不是0或着PROPAGATE,意味着当前结点需要一个signal但是尚未park,
			 // 所以调用者必须确保在挂起之前不能获取同步状态。
			// 并强行设置前置结点为SIGNAL。之所以CAS设置是因为,pred线程也会操作cancelAcquire来取消自己和node线程对pred的操作产生竞争条件。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

下图为非公平锁的获取流程图:
ReentrantLock源码分析

公平锁加锁过程


公平锁在构造方法中设置为true

ReentrantLock reentrantLock = new ReentrantLock(true);
        reentrantLock.lock();

ReentrantLock源码分析

 /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
        //与非公平锁一样,不一样在于公平锁与非公平锁,中的tryAcquire方法
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //为无锁状态
            if (c == 0) {
            ////拿到当前的同步状态, 如果是无锁状态, 则进行hasQueuedPredecessors方法逻辑
			//逻辑含义是:当前队列为空或本身是同步队列中的头结点。如果满足条件CAS获取同步状态,并设置当前独占线程。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

 public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        //只有  h !=t 则当前队列不为空,且 s.thread != Thread.currentThread() 当前线程不是头线程才返回true
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

公平锁与非公平锁的加锁区别


1、非公平锁在一开始lock的时候compareAndSetState(0, 1) 进行一次抢锁操作,当加锁失败之后后面抢锁入队等操作。

2、在重新tryAcquire 方法中,非公平锁直接compareAndSetState(0, 1) 进行一次抢锁操作,而公平锁,需要判断队列里面没有等待加锁线程或者自己是头结点才开始抢锁。

与公平锁相比,非公平锁做的 1、在没入队的时候拿到锁,避免过多队列操作维护成本。
2、尽量在睡眠前拿到锁,避免过多上下文切换

释放锁

由前面的代码可知,释放锁NonfairSync和FairSync都没重写,他们释放锁的逻辑是由ReentrantLock写的。故他们释放锁的逻辑是相同的。
ReentrantLock源码分析

   /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 当前这个头结点不一定是同步队列中的头结点
            // 因为可能是非公平锁的释放,不需要插入队列就特么插队获得同步了, 总之就是拿到头结点了。
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
//释放锁的逻辑(该线程本身只能操作自己的锁,所有该方法事前)
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //如果锁已经被释放完毕了,则清空该线程读独占线程
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //如果该线程还有锁没有被释放,则更新当前锁次数
            setState(c);
            return free;
        }