面试官都叫好的Synchronized底层实现,这工资开多少一个月?
本文为死磕synchronized底层实现第三篇文章,内容为重量级锁实现。
本系列文章将对hotspot的synchronized
锁实现进行全面分析,内容包括偏向锁、轻量级锁、重量级锁的加锁、解锁、锁升级流程的原理及源码分析,希望给在研究synchronized
路上的同学一些帮助。
重量级的膨胀和加锁流程
当出现多个线程同时竞争锁时,会进入到synchronizer.cpp#slow_enter
方法
void objectsynchronizer::slow_enter(handle obj, basiclock* lock, traps) { markoop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); // 如果是无锁状态 if (mark->is_neutral()) { lock->set_displaced_header(mark); if (mark == (markoop) atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { tevent (slow_enter: release stacklock) ; return ; } // fall through to inflate() ... } else // 如果是轻量级锁重入 if (mark->has_locker() && thread->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (basiclock*)obj->mark(), "don't relock with same basiclock"); lock->set_displaced_header(null); return; } ... // 这时候需要膨胀为重量级锁,膨胀前,设置displaced mark word为一个特殊值,代表该锁正在用一个重量级锁的monitor lock->set_displaced_header(markoopdesc::unused_mark()); //先调用inflate膨胀为重量级锁,该方法返回一个objectmonitor对象,然后调用其enter方法 objectsynchronizer::inflate(thread, obj())->enter(thread); }
在inflate
中完成膨胀过程。
objectmonitor * attr objectsynchronizer::inflate (thread * self, oop object) { ... for (;;) { const markoop mark = object->mark() ; assert (!mark->has_bias_pattern(), "invariant") ; // mark是以下状态中的一种: // * inflated(重量级锁状态) - 直接返回 // * stack-locked(轻量级锁状态) - 膨胀 // * inflating(膨胀中) - 忙等待直到膨胀完成 // * neutral(无锁状态) - 膨胀 // * biased(偏向锁) - 非法状态,在这里不会出现 // case: inflated if (mark->has_monitor()) { // 已经是重量级锁状态了,直接返回 objectmonitor * inf = mark->monitor() ; ... return inf ; } // case: inflation in progress if (mark == markoopdesc::inflating()) { // 正在膨胀中,说明另一个线程正在进行锁膨胀,continue重试 tevent (inflate: spin while inflating) ; // 在该方法中会进行spin/yield/park等操作完成自旋动作 readstablemark(object) ; continue ; } if (mark->has_locker()) { // 当前轻量级锁状态,先分配一个objectmonitor对象,并初始化值 objectmonitor * m = omalloc (self) ; m->recycle(); m->_responsible = null ; m->owneristhread = 0 ; m->_recursions = 0 ; m->_spinduration = objectmonitor::knob_spinlimit ; // consider: maintain by type/class // 将锁对象的mark word设置为inflating (0)状态 markoop cmp = (markoop) atomic::cmpxchg_ptr (markoopdesc::inflating(), object->mark_addr(), mark) ; if (cmp != mark) { omrelease (self, m, true) ; continue ; // interference -- just retry } // 栈中的displaced mark word markoop dmw = mark->displaced_mark_helper() ; assert (dmw->is_neutral(), "invariant") ; // 设置monitor的字段 m->set_header(dmw) ; // owner为lock record m->set_owner(mark->locker()); m->set_object(object); ... // 将锁对象头设置为重量级锁状态 object->release_set_mark(markoopdesc::encode(m)); ... return m ; } // case: neutral // 分配以及初始化objectmonitor对象 objectmonitor * m = omalloc (self) ; // prepare m for installation - set monitor to initial state m->recycle(); m->set_header(mark); // owner为null m->set_owner(null); m->set_object(object); m->owneristhread = 1 ; m->_recursions = 0 ; m->_responsible = null ; m->_spinduration = objectmonitor::knob_spinlimit ; // consider: keep metastats by type/class // 用cas替换对象头的mark word为重量级锁状态 if (atomic::cmpxchg_ptr (markoopdesc::encode(m), object->mark_addr(), mark) != mark) { // 不成功说明有另外一个线程在执行inflate,释放monitor对象 m->set_object (null) ; m->set_owner (null) ; m->owneristhread = 0 ; m->recycle() ; omrelease (self, m, true) ; m = null ; continue ; // interference - the markword changed - just retry. // the state-transitions are one-way, so there's no chance of // live-lock -- "inflated" is an absorbing state. } ... return m ; } }
inflate
中是一个for循环,主要是为了处理多线程同时调用inflate的情况。然后会根据锁对象的状态进行不同的处理:
1.已经是重量级状态,说明膨胀已经完成,直接返回
2.如果是轻量级锁则需要进行膨胀操作
3.如果是膨胀中状态,则进行忙等待
4.如果是无锁状态则需要进行膨胀操作
其中轻量级锁和无锁状态需要进行膨胀操作,轻量级锁膨胀流程如下:
1.调用omalloc
分配一个objectmonitor
对象(以下简称monitor),在omalloc
方法中会先从线程私有的monitor
集合omfreelist
中分配对象,如果omfreelist
中已经没有monitor
对象,则从jvm全局的gfreelist
中分配一批monitor
到omfreelist
中。
2.初始化monitor
对象
3.将状态设置为膨胀中(inflating)状态
4.设置monitor
的header字段为displaced mark word
,owner字段为lock record
,obj字段为锁对象
5.设置锁对象头的mark word
为重量级锁状态,指向第一步分配的monitor
对象
无锁状态下的膨胀流程如下:
1.调用omalloc
分配一个objectmonitor
对象(以下简称monitor)
2.初始化monitor
对象
3.设置monitor
的header字段为 mark word
,owner字段为null
,obj字段为锁对象
4.设置锁对象头的mark word
为重量级锁状态,指向第一步分配的monitor
对象
至于为什么轻量级锁需要一个膨胀中(inflating)状态,代码中的注释是:
// why do we cas a 0 into the mark-word instead of just casing the // mark-word from the stack-locked value directly to the new inflated state? // consider what happens when a thread unlocks a stack-locked object. // it attempts to use cas to swing the displaced header value from the // on-stack basiclock back into the object header. recall also that the // header value (hashcode, etc) can reside in (a) the object header, or // (b) a displaced header associated with the stack-lock, or (c) a displaced // header in an objectmonitor. the inflate() routine must copy the header // value from the basiclock on the owner's stack to the objectmonitor, all // the while preserving the hashcode stability invariants. if the owner // decides to release the lock while the value is 0, the unlock will fail // and control will eventually pass from slow_exit() to inflate. the owner // will then spin, waiting for the 0 value to disappear. put another way, // the 0 causes the owner to stall if the owner happens to try to // drop the lock (restoring the header from the basiclock to the object) // while inflation is in-progress. this protocol avoids races that might // would otherwise permit hashcode values to change or "flicker" for an object. // critically, while object->mark is 0 mark->displaced_mark_helper() is stable. // 0 serves as a "busy" inflate-in-progress indicator.
我没太看懂,有知道的同学可以指点下~
膨胀完成之后,会调用enter
方法获得锁
void attr objectmonitor::enter(traps) { thread * const self = thread ; void * cur ; // owner为null代表无锁状态,如果能cas设置成功,则当前线程直接获得锁 cur = atomic::cmpxchg_ptr (self, &_owner, null) ; if (cur == null) { ... return ; } // 如果是重入的情况 if (cur == self) { // todo-fixme: check for integer overflow! bugid 6557169. _recursions ++ ; return ; } // 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀且第一次调用enter方法,那cur是指向lock record的指针 if (self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); // 重入计数重置为1 _recursions = 1 ; // 设置owner字段为当前线程(之前owner是指向lock record的指针) _owner = self ; owneristhread = 1 ; return ; } ... // 在调用系统的同步操作之前,先尝试自旋获得锁 if (knob_spinearly && tryspin (self) > 0) { ... //自旋的过程中获得了锁,则直接返回 self->_stalled = 0 ; return ; } ... { ... for (;;) { jt->set_suspend_equivalent(); // 在该方法中调用系统同步操作 enteri (thread) ; ... } self->set_current_pending_monitor(null); } ... }
- 如果当前是无锁状态、锁重入、当前线程是之前持有轻量级锁的线程则进行简单操作后返回。
- 先自旋尝试获得锁,这样做的目的是为了减少执行操作系统同步操作带来的开销
- 调用
enteri
方法获得锁或阻塞
enteri
方法比较长,在看之前,我们先阐述下其大致原理:
一个objectmonitor
对象包括这么几个关键字段:cxq(下图中的contentionlist),entrylist ,waitset,owner。
其中cxq ,entrylist ,waitset都是由objectwaiter的链表结构,owner指向持有锁的线程。
当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个objectwaiter
对象插入到cxq的队列的队首,然后调用park
函数挂起当前线程。在linux系统上,park
函数底层调用的是gclib库的pthread_cond_wait
,jdk的reentrantlock
底层也是用该方法挂起线程的。更多细节可以看我之前的两篇文章:,
当线程释放锁时,会从cxq或entrylist中挑选一个线程唤醒,被选中的线程叫做heir presumptive
即假定继承人(应该是这样翻译),就是图中的ready thread
,假定继承人被唤醒后会尝试获得锁,但synchronized
是非公平的,所以假定继承人不一定能获得锁(这也是它叫”假定”继承人的原因)。
如果线程获得锁后调用object#wait
方法,则会将线程加入到waitset中,当被object#notify
唤醒后,会将线程从waitset移动到cxq或entrylist中去。需要注意的是,当调用一个锁对象的wait
或notify
方法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。
synchronized
的monitor
锁机制和jdk的reentrantlock
与condition
是很相似的,reentrantlock
也有一个存放等待获取锁线程的链表,condition
也有一个类似waitset
的集合用来存放调用了await
的线程。如果你之前对reentrantlock
有深入了解,那理解起monitor
应该是很简单。
回到代码上,开始分析enteri
方法:
void attr objectmonitor::enteri (traps) { thread * self = thread ; ... // 尝试获得锁 if (trylock (self) > 0) { ... return ; } deferredinitialize () ; // 自旋 if (tryspin (self) > 0) { ... return ; } ... // 将线程封装成node节点中 objectwaiter node(self) ; self->_parkevent->reset() ; node._prev = (objectwaiter *) 0xbad ; node.tstate = objectwaiter::ts_cxq ; // 将node节点插入到_cxq队列的头部,cxq是一个单向链表 objectwaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // cas失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率 if (trylock (self) > 0) { ... return ; } } // syncflags默认为0,如果没有其他等待的线程,则将_responsible设置为自己 if ((syncflags & 16) == 0 && nxt == null && _entrylist == null) { atomic::cmpxchg_ptr (self, &_responsible, null) ; } tevent (inflated enter - contention) ; int nwakeups = 0 ; int recheckinterval = 1 ; for (;;) { if (trylock (self) > 0) break ; assert (_owner != self, "invariant") ; ... // park self if (_responsible == self || (syncflags & 1)) { // 当前线程是_responsible时,调用的是带时间参数的park tevent (inflated enter - park timed) ; self->_parkevent->park ((jlong) recheckinterval) ; // increase the recheckinterval, but clamp the value. recheckinterval *= 8 ; if (recheckinterval > 1000) recheckinterval = 1000 ; } else { //否则直接调用park挂起当前线程 tevent (inflated enter - park untimed) ; self->_parkevent->park() ; } if (trylock(self) > 0) break ; ... if ((knob_spinafterfutile & 1) && tryspin (self) > 0) break ; ... // 在释放锁时,_succ会被设置为entrylist或_cxq中的一个线程 if (_succ == self) _succ = null ; // invariant: after clearing _succ a thread *must* retry _owner before parking. orderaccess::fence() ; } // 走到这里说明已经获得锁了 assert (_owner == self , "invariant") ; assert (object() != null , "invariant") ; // 将当前线程的node从cxq或entrylist中移除 unlinkafteracquire (self, &node) ; if (_succ == self) _succ = null ; if (_responsible == self) { _responsible = null ; orderaccess::fence(); } ... return ; }
主要步骤有3步:
- 将当前线程插入到cxq队列的队首
- 然后park当前线程
- 当被唤醒后再尝试获得锁
这里需要特别说明的是_responsible
和_succ
两个字段的作用:
当竞争发生时,选取一个线程作为_responsible
,_responsible
线程调用的是有时间限制的park
方法,其目的是防止出现搁浅
现象。
_succ
线程是在线程释放锁是被设置,其含义是heir presumptive
,也就是我们上面说的假定继承人。
重量级锁的释放
重量级锁释放的代码在objectmonitor::exit
:
void attr objectmonitor::exit(bool not_suspended, traps) { thread * self = thread ; // 如果_owner不是当前线程 if (thread != _owner) { // 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀后还没调用过enter方法,_owner会是指向lock record的指针。 if (thread->is_lock_owned((address) _owner)) { assert (_recursions == 0, "invariant") ; _owner = thread ; _recursions = 0 ; owneristhread = 1 ; } else { // 异常情况:当前不是持有锁的线程 tevent (exit - throw imsx) ; assert(false, "non-balanced monitor enter/exit!"); if (false) { throw(vmsymbols::java_lang_illegalmonitorstateexception()); } return; } } // 重入计数器还不为0,则计数器-1后返回 if (_recursions != 0) { _recursions--; // this is simple recursive enter tevent (inflated exit - recursive) ; return ; } // _responsible设置为null if ((syncflags & 4) == 0) { _responsible = null ; } ... for (;;) { assert (thread == _owner, "invariant") ; // knob_exitpolicy默认为0 if (knob_exitpolicy == 0) { // code 1:先释放锁,这时如果有其他线程进入同步块则能获得锁 orderaccess::release_store_ptr (&_owner, null) ; // drop the lock orderaccess::storeload() ; // see if we need to wake a successor // code 2:如果没有等待的线程或已经有假定继承人 if ((intptr_t(_entrylist)|intptr_t(_cxq)) == 0 || _succ != null) { tevent (inflated exit - simple egress) ; return ; } tevent (inflated exit - complex egress) ; // code 3:要执行之后的操作需要重新获得锁,即设置_owner为当前线程 if (atomic::cmpxchg_ptr (thread, &_owner, null) != null) { return ; } tevent (exit - reacquired) ; } ... objectwaiter * w = null ; // code 4:根据qmode的不同会有不同的唤醒策略,默认为0 int qmode = knob_qmode ; if (qmode == 2 && _cxq != null) { // qmode == 2 : cxq中的线程有更高优先级,直接唤醒cxq的队首线程 w = _cxq ; assert (w != null, "invariant") ; assert (w->tstate == objectwaiter::ts_cxq, "invariant") ; exitepilog (self, w) ; return ; } if (qmode == 3 && _cxq != null) { // 将cxq中的元素插入到entrylist的末尾 w = _cxq ; for (;;) { assert (w != null, "invariant") ; objectwaiter * u = (objectwaiter *) atomic::cmpxchg_ptr (null, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != null , "invariant") ; objectwaiter * q = null ; objectwaiter * p ; for (p = w ; p != null ; p = p->_next) { guarantee (p->tstate == objectwaiter::ts_cxq, "invariant") ; p->tstate = objectwaiter::ts_enter ; p->_prev = q ; q = p ; } // append the rats to the entrylist // todo: organize entrylist as a cdll so we can locate the tail in constant-time. objectwaiter * tail ; for (tail = _entrylist ; tail != null && tail->_next != null ; tail = tail->_next) ; if (tail == null) { _entrylist = w ; } else { tail->_next = w ; w->_prev = tail ; } // fall thru into code that tries to wake a successor from entrylist } if (qmode == 4 && _cxq != null) { // 将cxq插入到entrylist的队首 w = _cxq ; for (;;) { assert (w != null, "invariant") ; objectwaiter * u = (objectwaiter *) atomic::cmpxchg_ptr (null, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != null , "invariant") ; objectwaiter * q = null ; objectwaiter * p ; for (p = w ; p != null ; p = p->_next) { guarantee (p->tstate == objectwaiter::ts_cxq, "invariant") ; p->tstate = objectwaiter::ts_enter ; p->_prev = q ; q = p ; } // prepend the rats to the entrylist if (_entrylist != null) { q->_next = _entrylist ; _entrylist->_prev = q ; } _entrylist = w ; // fall thru into code that tries to wake a successor from entrylist } w = _entrylist ; if (w != null) { // 如果entrylist不为空,则直接唤醒entrylist的队首元素 assert (w->tstate == objectwaiter::ts_enter, "invariant") ; exitepilog (self, w) ; return ; } // entrylist为null,则处理cxq中的元素 w = _cxq ; if (w == null) continue ; // 因为之后要将cxq的元素移动到entrylist,所以这里将cxq字段设置为null for (;;) { assert (w != null, "invariant") ; objectwaiter * u = (objectwaiter *) atomic::cmpxchg_ptr (null, &_cxq, w) ; if (u == w) break ; w = u ; } tevent (inflated exit - drain cxq into entrylist) ; assert (w != null , "invariant") ; assert (_entrylist == null , "invariant") ; if (qmode == 1) { // qmode == 1 : 将cxq中的元素转移到entrylist,并反转顺序 objectwaiter * s = null ; objectwaiter * t = w ; objectwaiter * u = null ; while (t != null) { guarantee (t->tstate == objectwaiter::ts_cxq, "invariant") ; t->tstate = objectwaiter::ts_enter ; u = t->_next ; t->_prev = u ; t->_next = s ; s = t; t = u ; } _entrylist = s ; assert (s != null, "invariant") ; } else { // qmode == 0 or qmode == 2‘ // 将cxq中的元素转移到entrylist _entrylist = w ; objectwaiter * q = null ; objectwaiter * p ; for (p = w ; p != null ; p = p->_next) { guarantee (p->tstate == objectwaiter::ts_cxq, "invariant") ; p->tstate = objectwaiter::ts_enter ; p->_prev = q ; q = p ; } } // _succ不为null,说明已经有个继承人了,所以不需要当前线程去唤醒,减少上下文切换的比率 if (_succ != null) continue; w = _entrylist ; // 唤醒entrylist第一个元素 if (w != null) { guarantee (w->tstate == objectwaiter::ts_enter, "invariant") ; exitepilog (self, w) ; return ; } } }
在进行必要的锁重入判断以及自旋优化后,进入到主要逻辑:
code 1
设置owner为null,即释放锁,这个时刻其他的线程能获取到锁。这里是一个非公平锁的优化;
code 2
如果当前没有等待的线程则直接返回就好了,因为不需要唤醒其他线程。或者如果说succ不为null,代表当前已经有个”醒着的”继承人线程,那当前线程不需要唤醒任何线程;
code 3
当前线程重新获得锁,因为之后要操作cxq和entrylist队列以及唤醒线程;
code 4
根据qmode的不同,会执行不同的唤醒策略;
根据qmode的不同,有不同的处理方式:
- qmode = 2且cxq非空:取cxq队列队首的objectwaiter对象,调用exitepilog方法,该方法会唤醒objectwaiter对象的线程,然后立即返回,后面的代码不会执行了;
- qmode = 3且cxq非空:把cxq队列插入到entrylist的尾部;
- qmode = 4且cxq非空:把cxq队列插入到entrylist的头部;
- qmode = 0:暂时什么都不做,继续往下看;
只有qmode=2的时候会提前返回,等于0、3、4的时候都会继续往下执行:
1.如果entrylist的首元素非空,就取出来调用exitepilog方法,该方法会唤醒objectwaiter对象的线程,然后立即返回;
2.如果entrylist的首元素为空,就将cxq的所有元素放入到entrylist中,然后再从entrylist中取出来队首元素执行exitepilog方法,然后立即返回;
qmode默认为0,结合上面的流程我们可以看这么个demo:
public class syncdemo { public static void main(string[] args) { syncdemo syncdemo1 = new syncdemo(); syncdemo1.startthreada(); try { thread.sleep(100); } catch (interruptedexception e) { e.printstacktrace(); } syncdemo1.startthreadb(); try { thread.sleep(100); } catch (interruptedexception e) { e.printstacktrace(); } syncdemo1.startthreadc(); } final object lock = new object(); public void startthreada() { new thread(() -> { synchronized (lock) { system.out.println("a get lock"); try { thread.sleep(500); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println("a release lock"); } }, "thread-a").start(); } public void startthreadb() { new thread(() -> { synchronized (lock) { system.out.println("b get lock"); } }, "thread-b").start(); } public void startthreadc() { new thread(() -> { synchronized (lock) { system.out.println("c get lock"); } }, "thread-c").start(); } }
默认策略下,在a释放锁后一定是c线程先获得锁。因为在获取锁时,是将当前线程插入到cxq的头部,而释放锁时,默认策略是:如果entrylist为空,则将cxq中的元素按原有顺序插入到到entrylist,并唤醒第一个线程。也就是当entrylist为空时,是后来的线程先获取锁。这点jdk中的lock机制是不一样的。
synchronized和reentrantlock的区别
原理弄清楚了,顺便总结了几点synchronized和reentrantlock的区别:
- synchronized是jvm层次的锁实现,reentrantlock是jdk层次的锁实现;
- synchronized的锁状态是无法在代码中直接判断的,但是reentrantlock可以通过
reentrantlock#islocked
判断; - synchronized是非公平锁,reentrantlock是可以是公平也可以是非公平的;
- synchronized是不可以被中断的,而
reentrantlock#lockinterruptibly
方法是可以被中断的; - 在发生异常时synchronized会自动释放锁(由javac编译时自动实现),而reentrantlock需要开发者在finally块中显示释放锁;
- reentrantlock获取锁的形式有多种:如立即返回是否成功的trylock(),以及等待指定时长的获取,更加灵活;
- synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(上文有说),而reentrantlock对于已经在等待的线程一定是先来的线程先获得锁;
end
总的来说synchronized的重量级锁和reentrantlock的实现上还是有很多相似的,包括其数据结构、挂起线程方式等等。在日常使用中,如无特殊要求用synchronized就够了。你深入了解这两者其中一个的实现,了解另外一个或其他锁机制都比较容易,这也是我们常说的技术上的相通性。
欢迎关注我的公众号
上一篇: 面向对象之:类的成员
下一篇: Decorator:从原理到实践