线程并发情况下如何使用ReentrantLock同步机制
前言
在开发的过程中,许多并发的场景下,有可能会出现线程不安全的实例,我们可以使用Synchronized与ReentrantLock进行互斥同步的调用,相信大家Synchronized已经很熟悉了。今天主要详细介绍的是J.U.C包下的ReentrantLock,本文主要是对ReentrantLock的加锁与解锁机制进行一个深入解析~
ReentrantLock,顾名思义,是一个可重入的锁,是一种递归无阻塞的同步机制。比较Synchronzed更有公平锁与非公平锁的两种模式。内部主要利用CAS+AQS队列来实现。
ReentrantLock介绍
首先可以看到ReentrantLock实现了Lock中定义的方法。Lock接口中定义了最基本的加锁,与尝试加锁的方法,会在ReentrantLock做一个实现。
public class ReentrantLock implements Lock, java.io.Serializable
Node 状态
Node节点是对每个等待资源的线程的一个封装,每个节点内部除了封装线程,还有当前节点的状态值。根据节点状态和在队列中的情况,对同步机制做出处理。
CANCELLED(1): 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1): 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点从原来的状态更新为SIGNAL。
CONDITION(-2): 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
ReentrantLock 同步机制
ReentrantLock有两个构造函数,用来设定返回公平锁以及非公平锁。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/ public ReentrantLock() { //默认的构造函数生成的是非公平锁 sync = new NonfairSync(); } /**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/ public ReentrantLock(boolean fair) { //根据传入的fair值判断是返回公平锁还是非公平锁 sync = fair ? new FairSync() : new NonfairSync(); }
在通过公平锁或者是非公平锁生成一个实例对象锁后,后续的同步都会围绕该实例锁进行,如果new了一个新的实例锁,那么将是不一样的阻塞队列和锁的获取。可以将一个实例比对为一个锁。
所以,公平锁或者是非公平锁又是什么呢?先看非公平锁
//非公平锁是一个集成了Sync的静态内部类 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/ //进行加锁的操作 final void lock() { //在调用lock的时候,不判断等待队列而是直接进行锁的占用,判断是否成功设置锁 if (compareAndSetState(0, 1)) //进行设置获取锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else //尝试获取锁,不然则添加到等待队列 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
上面比较关键的是没有直接获取到锁而调用acquire(1);方法
//调用acquire方法,传入进行改变的锁状态值 public final void acquire(int arg) { //先尝试获取锁,如果获取锁失败了,则请求等待队列,将线程存入 if (!tryAcquire(arg) && //如果没有进行轮到执行,在这一步线程会进行阻塞,直到唤醒执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //默认执行的情况下interrupt为false,所以执行完毕就结束了,如果出现了中断,则会返回true,则会调用selfInterrupt(); // **为线程设置中断标识 selfInterrupt(); }
先来看看是如何请求获取锁的tryAcquire(arg)
//请求获取锁进行请求获取锁 protected final boolean tryAcquire(int acquires) { //引用非公平锁的获取锁 return nonfairTryAcquire(acquires); } //调用非公平锁获取锁,传入需要改变的锁变量值 final boolean nonfairTryAcquire(int acquires) { //首先获取到当前的线程 final Thread current = Thread.currentThread(); //获取有volatile所标识的当前锁状态值 int c = getState(); //如果锁状态值为0,代表没有线程占用锁 if (c == 0) { //使用cas改变当前的状态值 if (compareAndSetState(0, acquires)) { //设置锁占有是当前线程 setExclusiveOwnerThread(current); //返回true代表成功设置 return true; } } //如果锁状态值不为0, 并且当前线程就是占用锁的线程 else if (current == getExclusiveOwnerThread()) { //将原来的锁状态值加上新的需要添加的值 int nextc = c + acquires; //如果结果小于0,则异常 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //将state的值赋值为新的值 setState(nextc); //返回设置成功 return true; } //返回设置失败 return false; }
当锁设置成功后就返回true了,由**!tryAcquire(arg)** 直接跳出结束。如果返回的是false,那么就需要执行后续的逻辑acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。
一点点来看,先看Node.EXCLUSIVE是什么
/** Marker to indicate a node is waiting in exclusive mode */ //EXCLUSIVE是Node类的一个静态变量,标记以指示节点正在独占模式下等待。 static final Node EXCLUSIVE = null;
然后来查看addWaiter(Node.EXCLUSIVE),调用addWaiter方法传入节点状态。
/**
* Creates and enqueues node for current thread and given mode.
* 给当前线程和给定模式创造节点并传入队列
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/ //传入的是节点模式 private Node addWaiter(Node mode) { //创建一个节点粗出当前线程和节点模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //定义pred设置tail, tail为队列的尾节点 Node pred = tail; //如果尾部节点不为空 if (pred != null) { //设置要存入节点的前置节点为tail node.prev = pred; //通过cas设置tail的属性,传入原来的tail和node节点 if (compareAndSetTail(pred, node)) { //设置成功后,将原来的tail的next指向node pred.next = node; //返回node return node; } } //如果原来的tail节点为空,或者cas设置tail失败,进入enq() enq(node); //结束后返回node return node; } /**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/ //传入需要传入的节点 private Node enq(final Node node) { //设置一个for死循环,内部如果cas失败会重新进入for循环 for (;;) { //获取到tail节点的值 Node t = tail; //如果tail为null if (t == null) { // Must initialize //需要初始化头结点,cas由null变为新的node节点 if (compareAndSetHead(new Node())) //并将head赋值给tail tail = head; //如果cas失败则重新进入for循环 } else { //当获取的t不为null的时候,说明tail是有节点的,将node的前置节点设置为t node.prev = t; //然后通过cas设置将node设置为tail,失败的话就for循环重新设置 if (compareAndSetTail(t, node)) { //原tail.next = node t.next = node; //返回原来的tail节点 return t; } } } }
addWaiter() 方法会返回传入的node值,将线程信息摄入队列后,然后传入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/ //传入添加进队列的node 和 当前的锁状态值 final boolean acquireQueued(final Node node, int arg) { //设定failed为true boolean failed = true; try { //设定是否中断为false boolean interrupted = false; //进入for死循环 for (;;) { //获取当前node的前驱节点 final Node p = node.predecessor(); //如果前驱节点是head节点, 就去执行tryAcquire(arg),上面有解析过。会对锁状态值进行变换 if (p == head && tryAcquire(arg)) { //如果成功设置了,代表已经轮到了当前的节点执行 //并将head设置为当前的node, 并将内部指定的thread和prev置空 setHead(node); //将node的前驱节点的next设置为null,表示已经用不到了,方便gc p.next = null; // help GC //设置failed为false failed = false; //返回中断的情况,可能是下方改变了interrupted状态。 return interrupted; } //如果前置节点的状态为SIGNAL, 那么会执行parkAndCheckInterrupt() if (shouldParkAfterFailedAcquire(p, node) && //挂起线程并返回是否被中断, 如果后续线程被唤醒,会继续执行for循环, parkAndCheckInterrupt()) //如果被中断则设置中断情况为true interrupted = true; } } finally { //如果中途出现异常,使得failed为true。 if (failed) //调用cancelAcquire(node) 详细下面有方法 cancelAcquire(node); } } /**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前置节点的状态。 int ws = pred.waitStatus; //如果前置节点的状态为SIGNAL,表示等待触发状态,那么当前节点就可以挂起 if (ws == Node.SIGNAL) /*
* This node has already set status asking a release
* to signal it, so it can safely park.
*
*/ //返回true。 return true; //如果前置节点状态>0,代表是CANCELLED状态,表示当前线程被取消 if (ws > 0) { /*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/ //进入do while循环 do { //则将pred = pred.prev, 然后让node 的前置节点 = pred node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //让pred的下一个节点设置为node。 这样就跳过了中间为CANCELLED状态的节点。 pred.next = node; } else { /*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/ //现在waitStatus只能有2种状态,一个是0一个是传播。节点会将前节点转化为SIGNAL状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } //返回false return false; } /**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/ private final boolean parkAndCheckInterrupt() { //通过LockSupport挂起当前线程,直到被唤醒 LockSupport.park(this); //返回线程的中断状态 return Thread.interrupted(); } /**
* Cancels an ongoing attempt to acquire.
* 取消正在进行的尝试获取
* @param node the node
*/ private void cancelAcquire(Node node) { // Ignore if node doesn't exist //如果传入的node为null if (node == null) //直接返回 return; //设置节点内的线程标识位null node.thread = null; // Skip cancelled predecessors //获取到节点的前驱节点 Node pred = node.prev; //如果前驱节点状态大于0,那就是CANCELLED状态 while (pred.waitStatus > 0) //将中间所有节点状态为CANCCELLED的节点进行排除。 node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //获取到下一个节点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //设置当前节点的状态为CANCELLED,这样别的节点在运行的时候会跳过当前节点。 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果我们就是tail,那么使用cas将tail设置为pred if (node == tail && compareAndSetTail(node, pred)) { //并将下一个节点用通过cas设置为null compareAndSetNext(pred, predNext, null); } else { //不然如果node不为tail/尾部或者cas设置失败。 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; //如果前驱节点不为head if (pred != head && //并且前驱节点的状态为SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || //或者状态值为0和传播,然后使用cas设定为SIGNAL成功 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && //并且前置节点的线程不为null pred.thread != null) { //获取节点的下一个节点 Node next = node.next; //如果下一个节点不为null 并且 下一个节点的状态 <= 0 if (next != null && next.waitStatus <= 0) //通过cas设置前置节点的下一个节点,由preNext替换为next。 compareAndSetNext(pred, predNext, next); } else { //唤醒下一个节点 unparkSuccessor(node); } node.next = node; // help GC } }
以上就是非公平锁线程执行任务,进入阻塞队列进行挂起等待直到轮到他被唤醒获取锁的过程,但是该线程在等待的情况下是如何被唤醒的呢?
来看看unlock() 方法
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
* 如果当前线程持有锁,则进行计数递减,如果计数为0则释放锁。如果当前线程没有持有锁
* 则会引发异常。
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/ //调用解锁 public void unlock() { //调用sync的release(1) sync.release(1); } /**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/ public final boolean release(int arg) { //尝试释放锁,递减锁状态值 if (tryRelease(arg)) { //如果锁进行释放了则获取当前的head节点 Node h = head; //如果head不为null并且head的节点状态不为0,即不为在队列中等待获取锁 if (h != null && h.waitStatus != 0) //执行unparkSuccessor unparkSuccessor(h); return true; } return false; } //尝试解锁 protected final boolean tryRelease(int releases) { //获取当前锁的状态值 - 需要递减的值 int c = getState() - releases; //判断当前的线程是否是持有锁的线程 if (Thread.currentThread() != getExclusiveOwnerThread()) //如果不是则异常 throw new IllegalMonitorStateException(); //设置free判断锁是否被释放 boolean free = false; //如果递减后的锁状态值为0,则代表锁已经被释放 if (c == 0) { free = true; //将当然持有锁的变为null setExclusiveOwnerThread(null); } //将修改后的值设置到锁状态值 setState(c); //返回是否解锁 return free; } /**
* Wakes up node's successor, if one exists.
* 唤醒后继节点,如果它存在
*
* @param node the node
*/ private void unparkSuccessor(Node node) { /*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/ //获取当前的节点状态 int ws = node.waitStatus; //如果节点状态值小于0 if (ws < 0) //cas将原本的值转换为0 compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/ //获取到node的下一个节点 Node s = node.next; //如果s为空或者s已经取消 if (s == null || s.waitStatus > 0) { //设置s为null s = null; //设置for循环,从后往前遍历,找到最开头的复合条件的node for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) //将符合条件的t赋值给s s = t; } //如果s不为null if (s != null) //就将s的线程解锁继续执行。 LockSupport.unpark(s.thread); }
唤醒队列的下一个线程后,下一个线程又能去获取锁,然后执行。但是因为是非公平锁,所以可能会被新的线程直接尝试获取锁,从而队列中的线程获取锁失败,继续挂起等待。这是非公平锁,而在公平锁中,则是按队列的顺序优先执行。
来看看公平锁是如何获取锁的
可以看见公平锁是调用acquire(1) 去获取锁,而非公平锁则是直接用cas进行先尝试获取,如果失败了再调用acquire(1) 去获取锁。
//这是公平锁的获取锁方式 final void lock() { acquire(1); } //这是非公平锁的获取锁方式 final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
总结
本文以自己的理解介绍了ReentrantLock的同步机制,仅仅只是对lock与unlock方法进行解析,如有理解错误的地方请在评论区探讨!
参考:Java并发之AQS详解
本文地址:https://blog.csdn.net/qq_41762594/article/details/108799288