java AQS源码阅读(三)共享锁的实现及synchronized的实现
1)独占功能:当锁被头节点获取后,只有头节点获取锁,其余节点的线程继续沉睡,
等待锁被释放后,才会唤醒下一个节点的线程。
2)共享功能:只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,
每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。
二、源码
AQS*享锁相关代码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private volatile int state;//对于共享锁,这个state的作用类似计数器
/**
* 请求共享锁
*/
public final void acquireShared(int arg) {
//state != 0时,tryAcquireShared(arg) < 0,才会真正操作锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 跟独占锁很像,只不过共享锁初始化时有传入一个count,count为
*/
private void doAcquireShared(int arg) {
//把当前线程封装到一个SHARE类型Node中,添加到SyncQueue尾巴上
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//前继节点是head节点,下一个就到自己了
int r = tryAcquireShared(arg);//非公平锁实现,再尝试获取锁
//state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。state为0说明共享次数已经到了,可以获取锁了
//注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
setHeadAndPropagate(node, r);//这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
//唤醒head节点线程后,从这里开始继续往下走
p.next = null; //head已经指向node节点,oldHead.next索引置空,方便p节点对象回收
if (interrupted)
selfInterrupt();
return;
}
}
//前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
/**
* 把node节点设置成head节点,且node.waitStatus->Node.PROPAGATE
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;//h用来保存旧的head节点
setHead(node);//head引用指向node节点
/* 这里意思有两种情况是需要执行唤醒操作
* 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
* 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
/* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
* head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
*/
doReleaseShared();//对于这个方法,其实就是把node节点设置成Node.PROPAGATE状态
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//state为0时,返回true(针对CountDownLatch)
doReleaseShared();
return true;
}
return false;
}
/**
* 把当前结点设置为SIGNAL或者PROPAGATE
* 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
* head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
* head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//head是SIGNAL状态
/* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE,
* 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
* 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
*/
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;//设置失败,重新循环
/* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
* 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
* 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
*/
unparkSuccessor(h);
/*
* 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
* 意味着需要将状态向后一个节点传播
*/
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)//如果head变了,重新循环
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;//node.next
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒的是下一个可唤醒的线程
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {//去除CANCELLED节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
}
CountDownLatch共享锁源码
public class CountDownLatch {
//继承AQS,核心实现都在AQS里
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
//共享锁state的值可以自己设定,用作计算共享次数,这点跟排它锁(只能0/1)不同
setState(count);
}
int getCount() {
return getState();
}
/* tryAcquireShared返回值:
* < 0:表示获取锁失败,需要进入等待队列
* = 0:表示当前线程获取共享锁成功,但不需要把它后面等待的节点唤醒
* > 0:表示当前线程获取共享锁成功,且此时需要把后续节点唤醒让它们去尝试获取共享锁
*/
protected int tryAcquireShared(int acquires) {
/* getState()是初始化时传入的count值,getState>0,return -1,在AQS中会往下执行
* getState == 0时,return 1,在AQS中不往下走
*/
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)//state == 0 表示锁已经释放了
return false;
int nextc = c - 1;//每次调用tryReleaseShared,state值减1
if (compareAndSetState(c, nextc))
return nextc == 0;//state为0了,返回true,这时才真正去释放锁
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
}
总结:
1)共享锁初始化时会给state设值,所有请求锁的共享节点都会放入SyncQueue中阻塞
2)一个节点A获取锁(成为head节点)之后,会唤醒它的下一个共享节点线程B,B唤醒后会去竞争锁,B获取锁之后head节点就指向B节点了,此时会唤醒B的下一个节点C,C唤醒后又会去竞争锁,...,一直往下,直到后面的共享节点都唤醒为止。
此时所有共享节点都获取了锁,都可以往下执行了。
3)通过1)2)可知,共享锁是先阻塞多个线程,然后解锁后多个线程同时放开,都可以往下走。
可以用于多线程下,一个线程需要等待另一个线程执行到某一步的场景。
4)tryAcquireShared返回值:
< 0:表示获取锁失败,需要进入等待队列
= 0:表示当前线程获取共享锁成功,但不需要把它后面等待的节点唤醒
> 0:表示当前线程获取共享锁成功,且此时需要把后续节点唤醒让它们去尝试获取共享锁
而执行releaseShared之后,释放的是共享锁,此时无论共享锁还是独占锁都能竞争锁。
参考资料:
https://segmentfault.com/a/1190000011391092
===================================================================
1、锁的实现依赖:synchronized在软件层面依赖JVM
Lock在硬件层面依赖特殊的CPU指令
2、synchronized锁,锁住的是什么(可以把任何一个非null对象作为"锁")
1)当synchronized作用在方法上时,锁住的便是对象实例(this)
2)当作用在静态方法时锁住的便是对象对应的Class实例,因为Class数据存在于永久带,
因此静态方法锁相当于该类的一个全局锁;
3)当synchronized作用于某一个对象实例时,锁住的便是对应的代码块。
3、synchronized实现架构/流程
1)Contention List:所有请求锁的线程将被首先放置到该竞争队列
2)Entry List:Contention List中那些有资格成为候选人的线程被移到Entry List
3)OnDeck:任何时刻最多只能有一个线程正在竞争锁,该线程称为OnDeck
4)Owner:获得锁的线程称为Owner
5)Wait Set:那些调用wait方法被阻塞的线程被放置到Wait Set
流程:ContentionList -> EntryList -> OnDeck -> Owner -> WaitSet -> EntryList ->OnDeck循环下去
4、自旋锁
原理:有线程在竞争锁时,若Owner线程能在很短的时间内释放锁,则那些竞争线程可以稍微等一等(自旋),
在Owner线程释放锁后,竞争线程可能会立即得到锁,从而避免了系统阻塞。但Owner运行的时间可能会超出了临界值,
竞争线程自旋一段时间后还是无法获得锁,这时竞争线程会停止自旋进入阻塞状态。
基本思路就是自旋,获取锁,不成功再阻塞,尽量降低阻塞的可能性,这对那些执行时间很短的代码块来说有非常重要的性能提高。
线程自旋时什么都不做,可以执行几次for循环,可以执行几条空的汇编指令,目的是占着CPU不放,等待获取锁的机会。
5、偏向锁
偏向锁主要解决无竞争下的锁性能问题
无竞争下锁存在的问题:
现在几乎所有的锁都是可重入的,已经获得锁的线程可以多次锁住/解锁监视对象,每次加锁/解锁都会涉及到一些CAS操作(比如对等待队列的CAS操作),CAS操作会延迟本地调用。
原理:一旦线程第一次获得了监视对象,之后让监视对象“偏向”这个线程,之后的多次调用则可以避免CAS操作,
也就是置个变量,如果发现线程获取过监视对象,变量设置为true,无需再走各种加锁/解锁流程,直接获取锁。
上一篇: 互相独立进程间共享内存互斥访问的解决办法
下一篇: zookeeper实现共享锁