Java concurrency之公平锁(一)_动力节点Java学院整理
基本概念
本章,我们会讲解“线程获取公平锁”的原理;在讲解之前,需要了解几个基本概念。后面的内容,都是基于这些概念的;这些概念可能比较枯燥,但从这些概念中,能窥见“java锁”的一些架构,这对我们了解锁是有帮助的。
1. aqs -- 指abstractqueuedsynchronizer类。
aqs是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。aqs是独占锁(例如,reentrantlock)和共享锁(例如,semaphore)的公共父类。
2. aqs锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过clh等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视clh等待队列而直接获取锁。独占锁的典型实例子是reentrantlock,此外,reentrantreadwritelock.writelock也是独占锁。
(02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。juc包中的reentrantreadwritelock.readlock,cyclicbarrier, countdownlatch和semaphore都是共享锁。这些锁的用途和原理,在以后的章节再详细介绍。
3. clh队列 -- craig, landin, and hagersten lock queue
clh队列是aqs中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。clh就是管理这些“等待锁”的线程的队列。
clh是一个非阻塞的 fifo 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 cas 保证节点插入和移除的原子性。
4. cas函数 -- compare and swap
cas函数,是比较并交换函数,它是原子操作函数;即,通过cas操作的数据都是以原子方式进行的。例如,compareandsethead(), compareandsettail(), compareandsetnext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。
本章是围绕“公平锁”如何获取锁而层次展开。“公平锁”涉及到的知识点比较多,但总的来说,不是特别难;如果读者能读懂aqs和reentrantlock.java这两个类的大致意思,理解锁的原理和机制也就不成问题了。本章只是作者本人对锁的一点点理解,希望这部分知识能帮助您了解“公平锁”的获取过程,认识“锁”的框架。
reentrantlock数据结构
reentrantlock的uml类图
从图中可以看出:
(01) reentrantlock实现了lock接口。
(02) reentrantlock与sync是组合关系。reentrantlock中,包含了sync对象;而且,sync是aqs的子类;更重要的是,sync有两个子类fairsync(公平锁)和nonfairsync(非公平锁)。reentrantlock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是"fairsync的实例"还是"nonfairsync的实例"。
获取公平锁(基于jdk1.7.0_40)
通过前面“java多线程系列--“juc锁”02之 互斥锁reentrantlock”的“示例1”,我们知道,获取锁是通过lock()函数。下面,我们以lock()对获取公平锁的过程进行展开。
1. lock()
lock()在reentrantlock.java的fairsync类中实现,它的源码如下:
final void lock() { acquire(1); }
说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
由于reentrantlock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因了。
可重入就是指锁可以被单个线程多次获取。
2. acquire()
acquire()在aqs中实现的,它的源码如下:
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); }
(01) “当前线程”首先通过tryacquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addwaiter(node.exclusive)来将“当前线程”加入到"clh队列(非阻塞的fifo队列)"末尾。clh队列就是线程等待队列。
(03) 再执行完addwaiter(node.exclusive)之后,会调用acquirequeued()来获取锁。由于此时reentrantlock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquirequeued()时,会进入到clh队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquirequeued会返回true,此时"当前线程"会调用selfinterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。
上面是对acquire()的概括性说明。下面,我们将该函数分为4部分来逐步解析。
一. tryacquire()
1. tryacquire()
公平锁的tryacquire()在reentrantlock.java的fairsync类中实现,源码如下:
protected final boolean tryacquire(int acquires) { // 获取“当前线程” final thread current = thread.currentthread(); // 获取“独占锁”的状态 int c = getstate(); // c=0意味着“锁没有被任何线程锁拥有”, if (c == 0) { // 若“锁没有被任何线程锁拥有”, // 则判断“当前线程”是不是clh队列中的第一个线程线程, // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。 if (!hasqueuedpredecessors() && compareandsetstate(0, acquires)) { setexclusiveownerthread(current); return true; } } else if (current == getexclusiveownerthread()) { // 如果“独占锁”的拥有者已经为“当前线程”, // 则将更新锁的状态。 int nextc = c + acquires; if (nextc < 0) throw new error("maximum lock count exceeded"); setstate(nextc); return true; } return false; }
说明:根据代码,我们可以分析出,tryacquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。
2. hasqueuedpredecessors()
hasqueuedpredecessors()在aqs中实现,源码如下:
public final boolean hasqueuedpredecessors() { node t = tail; node h = head; node s; return h != t && ((s = h.next) == null || s.thread != thread.currentthread()); }
说明: 通过代码,能分析出,hasqueuedpredecessors() 是通过判断"当前线程"是不是在clh队列的队首,来返回aqs中是不是有比“当前线程”等待更久的线程。下面对head、tail和node进行说明。
3. node的源码
node就是clh队列的节点。node在aqs中实现,它的数据结构如下:
private transient volatile node head; // clh队列的队首 private transient volatile node tail; // clh队列的队尾 // clh队列的节点 static final class node { static final node shared = new node(); static final node exclusive = null; // 线程已被取消,对应的waitstatus的值 static final int cancelled = 1; // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitstatus的值。 // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 static final int signal = -1; // 线程(处在condition休眠状态)在等待condition唤醒,对应的waitstatus的值 static final int condition = -2; // (共享锁)其它线程获取到“共享锁”,对应的waitstatus的值 static final int propagate = -3; // waitstatus为“cancelled, signal, condition, propagate”时分别表示不同状态, // 若waitstatus=0,则意味着当前线程不属于上面的任何一种状态。 volatile int waitstatus; // 前一节点 volatile node prev; // 后一节点 volatile node next; // 节点所对应的线程 volatile thread thread; // nextwaiter是“区别当前clh队列是 ‘独占锁'队列 还是 ‘共享锁'队列 的标记” // 若nextwaiter=shared,则clh队列是“独占锁”队列; // 若nextwaiter=exclusive,(即nextwaiter=null),则clh队列是“共享锁”队列。 node nextwaiter; // “共享锁”则返回true,“独占锁”则返回false。 final boolean isshared() { return nextwaiter == shared; } // 返回前一节点 final node predecessor() throws nullpointerexception { node p = prev; if (p == null) throw new nullpointerexception(); else return p; } node() { // used to establish initial head or shared marker } // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。 node(thread thread, node mode) { // used by addwaiter this.nextwaiter = mode; this.thread = thread; } // 构造函数。thread是节点所对应的线程,waitstatus是线程的等待状态。 node(thread thread, int waitstatus) { // used by condition this.waitstatus = waitstatus; this.thread = thread; } }
说明:
node是clh队列的节点,代表“等待锁的线程队列”。
(01) 每个node都会一个线程对应。
(02) 每个node会通过prev和next分别指向上一个节点和下一个节点,这分别代表上一个等待线程和下一个等待线程。
(03) node通过waitstatus保存线程的等待状态。
(04) node通过nextwaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextwaiter的值为exclusive;如果是“共享锁”线程,则nextwaiter的值是shared。
4. compareandsetstate()
compareandsetstate()在aqs中实现。它的源码如下:
protected final boolean compareandsetstate(int expect, int update) { return unsafe.compareandswapint(this, stateoffset, expect, update); }
说明: compareandswapint() 是sun.misc.unsafe类中的一个本地方法。对此,我们需要了解的是 compareandsetstate(expect, update) 是以原子的方式操作当前线程;若当前线程的状态为expect,则设置它的状态为update。
5. setexclusiveownerthread()
setexclusiveownerthread()在abstractownablesynchronizer.java中实现,它的源码如下:
// exclusiveownerthread是当前拥有“独占锁”的线程 private transient thread exclusiveownerthread; protected final void setexclusiveownerthread(thread t) { exclusiveownerthread = t; }
说明:setexclusiveownerthread()的作用就是,设置线程t为当前拥有“独占锁”的线程。
6. getstate(), setstate()
getstate()和setstate()都在aqs中实现,源码如下:
// 锁的状态 private volatile int state; // 设置锁的状态 protected final void setstate(int newstate) { state = newstate; } // 获取锁的状态 protected final int getstate() { return state; }
说明:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。
小结:tryacquire()的作用就是让“当前线程”尝试获取锁。获取成功返回true,失败则返回false。
二. addwaiter(node.exclusive)
addwaiter(node.exclusive)的作用是,创建“当前线程”的node节点,且node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到clh队列的末尾。
1.addwaiter()
addwaiter()在aqs中实现,源码如下:
private node addwaiter(node mode) { // 新建一个node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。 node node = new node(thread.currentthread(), mode); node pred = tail; // 若clh队列不为空,则将“当前线程”添加到clh队列末尾 if (pred != null) { node.prev = pred; if (compareandsettail(pred, node)) { pred.next = node; return node; } } // 若clh队列为空,则调用enq()新建clh队列,然后再将“当前线程”添加到clh队列中。 enq(node); return node; }
说明:对于“公平锁”而言,addwaiter(node.exclusive)会首先创建一个node节点,节点的类型是“独占锁”(node.exclusive)类型。然后,再将该节点添加到clh队列的末尾。
2. compareandsettail()
compareandsettail()在aqs中实现,源码如下:
private final boolean compareandsettail(node expect, node update) { return unsafe.compareandswapobject(this, tailoffset, expect, update); }
说明:compareandsettail也属于cas函数,也是通过“本地方法”实现的。compareandsettail(expect, update)会以原子的方式进行操作,它的作用是判断clh队列的队尾是不是为expect,是的话,就将队尾设为update。
3. enq()
enq()在aqs中实现,源码如下:
private node enq(final node node) { for (;;) { node t = tail; if (t == null) { // must initialize if (compareandsethead(new node())) tail = head; } else { node.prev = t; if (compareandsettail(t, node)) { t.next = node; return t; } } } }
说明: enq()的作用很简单。如果clh队列为空,则新建一个clh表头;然后将node添加到clh末尾。否则,直接将node添加到clh末尾。
小结:addwaiter()的作用,就是将当前线程添加到clh队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。
三. acquirequeued()
前面,我们已经将当前线程添加到clh队列中了。而acquirequeued()的作用就是逐步的去执行clh队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。下面,我们看看acquirequeued()的具体流程。
1. acquirequeued()
acquirequeued()在aqs中实现,源码如下:
final boolean acquirequeued(final node node, int arg) { boolean failed = true; try { // interrupted表示在clh队列的调度中, // “当前线程”在休眠时,有没有被中断过。 boolean interrupted = false; for (;;) { // 获取上一个节点。 // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。 final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return interrupted; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
说明:acquirequeued()的目的是从队列中获取锁。
2. shouldparkafterfailedacquire()
shouldparkafterfailedacquire()在aqs中实现,源码如下:
// 返回“当前线程是否应该阻塞” private static boolean shouldparkafterfailedacquire(node pred, node node) { // 前继节点的状态 int ws = pred.waitstatus; // 如果前继节点是signal状态,则意味这当前线程需要被unpark唤醒。此时,返回true。 if (ws == node.signal) return true; // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitstatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为signal状态。 compareandsetwaitstatus(pred, ws, node.signal); } return false; }
说明:
(01) 关于waitstatus请参考下表(中扩号内为waitstatus的值),更多关于waitstatus的内容,可以参考前面的node类的介绍。
cancelled[1] -- 当前线程已被取消
signal[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
condition[-2] -- 当前线程(处在condition休眠状态)在等待condition唤醒
propagate[-3] -- (共享锁)其它线程获取到“共享锁”
[0] -- 当前线程不属于上面的任何一种状态。
(02) shouldparkafterfailedacquire()通过以下规则,判断“当前线程”是否需要被阻塞。
规则1:如果前继节点状态为signal,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为cancelled(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非cancelled状态)的节点,并返回false。
规则3:如果前继节点状态为非signal、非cancelled,则设置前继的状态为signal,并返回false。
如果“规则1”发生,即“前继节点是signal”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkandcheckinterrupt()阻塞当前线程,直到当前先被唤醒才从parkandcheckinterrupt()中返回。
3. parkandcheckinterrupt())
parkandcheckinterrupt()在aqs中实现,源码如下:
private final boolean parkandcheckinterrupt() { // 通过locksupport的park()阻塞“当前线程”。 locksupport.park(this); // 返回线程的中断状态。 return thread.interrupted(); }
说明:parkandcheckinterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
它会先通过locksupport.park()阻塞“当前线程”,然后通过thread.interrupted()返回线程的中断状态。
这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。
补充:locksupport()中的park(),unpark()的作用 和 object中的wait(),notify()作用类似,是阻塞/唤醒。
它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过synchronized获取同步锁。
关于locksupport,我们会在之后的章节再专门进行介绍!
4. 再次tryacquire()
了解了shouldparkafterfailedacquire()和parkandcheckinterrupt()函数之后。我们接着分析acquirequeued()的for循环部分。
final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return interrupted; }
说明:
(01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点,若对此有疑惑可以查看下面关于node类的介绍。
(02) p == head && tryacquire(arg)
首先,判断“前继节点”是不是chl表头。如果是的话,则通过tryacquire()尝试获取锁。
其实,这样做的目的是为了“让当前线程获取锁”,但是为什么需要先判断p==head呢?理解这个对理解“公平锁”的机制很重要,因为这么做的原因就是为了保证公平性!
(a) 前面,我们在shouldparkafterfailedacquire()我们判断“当前线程”是否需要阻塞;
(b) 接着,“当前线程”阻塞的话,会调用parkandcheckinterrupt()来阻塞线程。当线程被解除阻塞的时候,我们会返回线程的中断状态。而线程被解决阻塞,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。
(c) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这一点,后面在“释放锁”的过程中会看到)。 ok,是前继节点调用unpark()唤醒了当前线程!
此时,再来理解p==head就很简单了:当前继节点是clh队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过tryacquire()获取锁;获取成功的话,通过sethead(node)设置当前节点为头节点,并返回。
总之,如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是clh表头”,此时就是满足p==head,也就是符合公平性原则的。否则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!
小结:acquirequeued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。
四. selfinterrupt()
selfinterrupt()是aqs中实现,源码如下:
private static void selfinterrupt() { thread.currentthread().interrupt(); }
说明:selfinterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquirequeued()进行分析。如果在acquirequeued()中,当前线程被中断过,则执行selfinterrupt();否则不会执行。
在acquirequeued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkandcheckinterrupt()中,我们线程的中断状态时调用了thread.interrupted()。该函数不同于thread的isinterrupted()函数,isinterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfinterrupt()重新产生一个中断!
小结:selfinterrupt()的作用就是当前线程自己产生一个中断。
总结
再回过头看看acquire()函数,它最终的目的是获取锁!
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); }
(01) 先是通过tryacquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquirequeued()获取锁。
(02) 尝试失败的情况下,会先通过addwaiter()来将“当前线程”加入到"clh队列"末尾;然后调用acquirequeued(),在clh队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfinterrupt()来自己产生一个中断。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
Java concurrency之公平锁(一)_动力节点Java学院整理
-
Java concurrency之LockSupport_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(一)_动力节点Java学院整理
-
Java concurrency线程池之线程池原理(二)_动力节点Java学院整理
-
Java concurrency集合之CopyOnWriteArraySet_动力节点Java学院整理
-
Java concurrency集合之LinkedBlockingDeque_动力节点Java学院整理
-
Java concurrency之Condition条件_动力节点Java学院整理
-
Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理
-
servlet之cookie简介_动力节点Java学院整理
-
servlet之session工作原理简介_动力节点Java学院整理