欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

线程并发情况下如何使用ReentrantLock同步机制

程序员文章站 2022-03-10 10:36:01
在开发的过程中,许多并发的场景下,有可能会出现线程不安全的实例,我们可以使用Synchronized与ReentrantLock进行互斥同步的调用,相信大家Synchronized已经很熟悉了。今天主要详细介绍的是J.U.C包下的ReentrantLock,相对于Synchronized,ReentrantLock能够实现的功能会更多一些~...
前言

在开发的过程中,许多并发的场景下,有可能会出现线程不安全的实例,我们可以使用Synchronized与ReentrantLock进行互斥同步的调用,相信大家Synchronized已经很熟悉了。今天主要详细介绍的是J.U.C包下的ReentrantLock,本文主要是对ReentrantLock的加锁与解锁机制进行一个深入解析~

ReentrantLock,顾名思义,是一个可重入的锁,是一种递归无阻塞的同步机制。比较Synchronzed更有公平锁与非公平锁的两种模式。内部主要利用CAS+AQS队列来实现。
线程并发情况下如何使用ReentrantLock同步机制

ReentrantLock介绍

首先可以看到ReentrantLock实现了Lock中定义的方法。Lock接口中定义了最基本的加锁,与尝试加锁的方法,会在ReentrantLock做一个实现。

public class ReentrantLock implements Lock, java.io.Serializable 

Node 状态

Node节点是对每个等待资源的线程的一个封装,每个节点内部除了封装线程,还有当前节点的状态值。根据节点状态和在队列中的情况,对同步机制做出处理。
CANCELLED(1): 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1): 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点从原来的状态更新为SIGNAL。
CONDITION(-2): 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。

ReentrantLock 同步机制

ReentrantLock有两个构造函数,用来设定返回公平锁以及非公平锁。

 /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */ public ReentrantLock() { //默认的构造函数生成的是非公平锁 sync = new NonfairSync(); } /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */ public ReentrantLock(boolean fair) { //根据传入的fair值判断是返回公平锁还是非公平锁 sync = fair ? new FairSync() : new NonfairSync(); } 

在通过公平锁或者是非公平锁生成一个实例对象锁后,后续的同步都会围绕该实例锁进行,如果new了一个新的实例锁,那么将是不一样的阻塞队列和锁的获取。可以将一个实例比对为一个锁。
所以,公平锁或者是非公平锁又是什么呢?先看非公平锁

 //非公平锁是一个集成了Sync的静态内部类 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */ //进行加锁的操作 final void lock() { //在调用lock的时候,不判断等待队列而是直接进行锁的占用,判断是否成功设置锁 if (compareAndSetState(0, 1)) //进行设置获取锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else //尝试获取锁,不然则添加到等待队列 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 

上面比较关键的是没有直接获取到锁而调用acquire(1);方法

 //调用acquire方法,传入进行改变的锁状态值 public final void acquire(int arg) { //先尝试获取锁,如果获取锁失败了,则请求等待队列,将线程存入 if (!tryAcquire(arg) && //如果没有进行轮到执行,在这一步线程会进行阻塞,直到唤醒执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //默认执行的情况下interrupt为false,所以执行完毕就结束了,如果出现了中断,则会返回true,则会调用selfInterrupt(); // **为线程设置中断标识 selfInterrupt(); } 

先来看看是如何请求获取锁的tryAcquire(arg)

 //请求获取锁进行请求获取锁 protected final boolean tryAcquire(int acquires) { //引用非公平锁的获取锁 return nonfairTryAcquire(acquires); } //调用非公平锁获取锁,传入需要改变的锁变量值 final boolean nonfairTryAcquire(int acquires) { //首先获取到当前的线程 final Thread current = Thread.currentThread(); //获取有volatile所标识的当前锁状态值 int c = getState(); //如果锁状态值为0,代表没有线程占用锁 if (c == 0) { //使用cas改变当前的状态值 if (compareAndSetState(0, acquires)) { //设置锁占有是当前线程 setExclusiveOwnerThread(current); //返回true代表成功设置 return true; } } //如果锁状态值不为0, 并且当前线程就是占用锁的线程 else if (current == getExclusiveOwnerThread()) { //将原来的锁状态值加上新的需要添加的值 int nextc = c + acquires; //如果结果小于0,则异常 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //将state的值赋值为新的值 setState(nextc); //返回设置成功 return true; } //返回设置失败 return false; } 

当锁设置成功后就返回true了,由**!tryAcquire(arg)** 直接跳出结束。如果返回的是false,那么就需要执行后续的逻辑acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
一点点来看,先看Node.EXCLUSIVE是什么

 /** Marker to indicate a node is waiting in exclusive mode */ //EXCLUSIVE是Node类的一个静态变量,标记以指示节点正在独占模式下等待。 static final Node EXCLUSIVE = null; 

然后来查看addWaiter(Node.EXCLUSIVE),调用addWaiter方法传入节点状态。

 /**
     * Creates and enqueues node for current thread and given mode.
     * 给当前线程和给定模式创造节点并传入队列
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */ //传入的是节点模式 private Node addWaiter(Node mode) { //创建一个节点粗出当前线程和节点模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //定义pred设置tail, tail为队列的尾节点 Node pred = tail; //如果尾部节点不为空 if (pred != null) { //设置要存入节点的前置节点为tail node.prev = pred; //通过cas设置tail的属性,传入原来的tail和node节点 if (compareAndSetTail(pred, node)) { //设置成功后,将原来的tail的next指向node pred.next = node; //返回node return node; } } //如果原来的tail节点为空,或者cas设置tail失败,进入enq() enq(node); //结束后返回node return node; } /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */ //传入需要传入的节点  private Node enq(final Node node) { //设置一个for死循环,内部如果cas失败会重新进入for循环 for (;;) { //获取到tail节点的值 Node t = tail; //如果tail为null if (t == null) { // Must initialize //需要初始化头结点,cas由null变为新的node节点 if (compareAndSetHead(new Node())) //并将head赋值给tail tail = head; //如果cas失败则重新进入for循环 } else { //当获取的t不为null的时候,说明tail是有节点的,将node的前置节点设置为t node.prev = t; //然后通过cas设置将node设置为tail,失败的话就for循环重新设置 if (compareAndSetTail(t, node)) { //原tail.next = node t.next = node; //返回原来的tail节点 return t; } } } } 

addWaiter() 方法会返回传入的node值,将线程信息摄入队列后,然后传入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */ //传入添加进队列的node 和 当前的锁状态值 final boolean acquireQueued(final Node node, int arg) { //设定failed为true boolean failed = true; try { //设定是否中断为false boolean interrupted = false; //进入for死循环 for (;;) { //获取当前node的前驱节点 final Node p = node.predecessor(); //如果前驱节点是head节点, 就去执行tryAcquire(arg),上面有解析过。会对锁状态值进行变换 if (p == head && tryAcquire(arg)) { //如果成功设置了,代表已经轮到了当前的节点执行 //并将head设置为当前的node, 并将内部指定的thread和prev置空 setHead(node); //将node的前驱节点的next设置为null,表示已经用不到了,方便gc p.next = null; // help GC //设置failed为false  failed = false; //返回中断的情况,可能是下方改变了interrupted状态。 return interrupted; } //如果前置节点的状态为SIGNAL, 那么会执行parkAndCheckInterrupt() if (shouldParkAfterFailedAcquire(p, node) && //挂起线程并返回是否被中断, 如果后续线程被唤醒,会继续执行for循环, parkAndCheckInterrupt()) //如果被中断则设置中断情况为true interrupted = true; } } finally { //如果中途出现异常,使得failed为true。 if (failed) //调用cancelAcquire(node) 详细下面有方法 cancelAcquire(node); } } /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的状态。 int ws = pred.waitStatus; //如果前置节点的状态为SIGNAL,表示等待触发状态,那么当前节点就可以挂起 if (ws == Node.SIGNAL) /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 
             */ //返回true。 return true; //如果前置节点状态>0,代表是CANCELLED状态,表示当前线程被取消 if (ws > 0) { /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */ //进入do while循环 do { //则将pred = pred.prev, 然后让node 的前置节点 = pred node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //让pred的下一个节点设置为node。 这样就跳过了中间为CANCELLED状态的节点。 pred.next = node; } else { /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */ //现在waitStatus只能有2种状态,一个是0一个是传播。节点会将前节点转化为SIGNAL状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回false return false; } /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */ private final boolean parkAndCheckInterrupt() { //通过LockSupport挂起当前线程,直到被唤醒 LockSupport.park(this); //返回线程的中断状态 return Thread.interrupted(); } /**
     * Cancels an ongoing attempt to acquire.
     * 取消正在进行的尝试获取
     * @param node the node
     */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist //如果传入的node为null if (node == null) //直接返回 return; //设置节点内的线程标识位null node.thread = null; // Skip cancelled predecessors //获取到节点的前驱节点 Node pred = node.prev; //如果前驱节点状态大于0,那就是CANCELLED状态 while (pred.waitStatus > 0) //将中间所有节点状态为CANCCELLED的节点进行排除。 node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //获取到下一个节点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //设置当前节点的状态为CANCELLED,这样别的节点在运行的时候会跳过当前节点。 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果我们就是tail,那么使用cas将tail设置为pred if (node == tail && compareAndSetTail(node, pred)) { //并将下一个节点用通过cas设置为null compareAndSetNext(pred, predNext, null); } else { //不然如果node不为tail/尾部或者cas设置失败。 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; //如果前驱节点不为head if (pred != head && //并且前驱节点的状态为SIGNAL  ((ws = pred.waitStatus) == Node.SIGNAL || //或者状态值为0和传播,然后使用cas设定为SIGNAL成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //并且前置节点的线程不为null pred.thread != null) { //获取节点的下一个节点 Node next = node.next; //如果下一个节点不为null 并且 下一个节点的状态 <= 0 if (next != null && next.waitStatus <= 0) //通过cas设置前置节点的下一个节点,由preNext替换为next。 compareAndSetNext(pred, predNext, next); } else { //唤醒下一个节点 unparkSuccessor(node); } node.next = node; // help GC } } 

以上就是非公平锁线程执行任务,进入阻塞队列进行挂起等待直到轮到他被唤醒获取锁的过程,但是该线程在等待的情况下是如何被唤醒的呢?
来看看unlock() 方法

 /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     * 如果当前线程持有锁,则进行计数递减,如果计数为0则释放锁。如果当前线程没有持有锁
     * 则会引发异常。
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */ //调用解锁  public void unlock() { //调用sync的release(1) sync.release(1); } /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     * 
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */ public final boolean release(int arg) { //尝试释放锁,递减锁状态值 if (tryRelease(arg)) { //如果锁进行释放了则获取当前的head节点 Node h = head; //如果head不为null并且head的节点状态不为0,即不为在队列中等待获取锁 if (h != null && h.waitStatus != 0) //执行unparkSuccessor unparkSuccessor(h); return true; } return false; } //尝试解锁 protected final boolean tryRelease(int releases) { //获取当前锁的状态值 - 需要递减的值 int c = getState() - releases; //判断当前的线程是否是持有锁的线程 if (Thread.currentThread() != getExclusiveOwnerThread()) //如果不是则异常 throw new IllegalMonitorStateException(); //设置free判断锁是否被释放 boolean free = false; //如果递减后的锁状态值为0,则代表锁已经被释放 if (c == 0) { free = true; //将当然持有锁的变为null setExclusiveOwnerThread(null); } //将修改后的值设置到锁状态值 setState(c); //返回是否解锁 return free; } /**
     * Wakes up node's successor, if one exists.
     * 唤醒后继节点,如果它存在
     * 
     * @param node the node
     */ private void unparkSuccessor(Node node) { /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */ //获取当前的节点状态  int ws = node.waitStatus; //如果节点状态值小于0 if (ws < 0) //cas将原本的值转换为0 compareAndSetWaitStatus(node, ws, 0); /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */ //获取到node的下一个节点 Node s = node.next; //如果s为空或者s已经取消 if (s == null || s.waitStatus > 0) { //设置s为null s = null; //设置for循环,从后往前遍历,找到最开头的复合条件的node for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //将符合条件的t赋值给s s = t; } //如果s不为null if (s != null) //就将s的线程解锁继续执行。 LockSupport.unpark(s.thread); } 

唤醒队列的下一个线程后,下一个线程又能去获取锁,然后执行。但是因为是非公平锁,所以可能会被新的线程直接尝试获取锁,从而队列中的线程获取锁失败,继续挂起等待。这是非公平锁,而在公平锁中,则是按队列的顺序优先执行。
来看看公平锁是如何获取锁的
可以看见公平锁是调用acquire(1) 去获取锁,而非公平锁则是直接用cas进行先尝试获取,如果失败了再调用acquire(1) 去获取锁。

 //这是公平锁的获取锁方式 final void lock() { acquire(1); } //这是非公平锁的获取锁方式 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 

总结

本文以自己的理解介绍了ReentrantLock的同步机制,仅仅只是对lock与unlock方法进行解析,如有理解错误的地方请在评论区探讨!

参考:Java并发之AQS详解

本文地址:https://blog.csdn.net/qq_41762594/article/details/108799288