java停止线程的方法(创建线程的三种方法)
一.引言
“操作系统的线程状态和java的线程状态有什么关系?”这是校招时被问到的一个问题。当时只顾着看博文、面经等零散的资料,没有形成系统的知识体系,一时语塞,答的不是很对。在网上也没找到足够细致地讲解博文,于是整理出了这篇内容。
java的线程状态牵扯到了同步语义,要探讨java的线程状态的,必不可免要回顾其锁机制。因此本文的主要分为两大块:一是synchronized源码粗析,分析了各类锁的进入、释放、升级过程,并大致说明了monitor原理;二是介绍了线程的实现方式和java线程状态转换的部分细节。
二. synchronized锁
java采用synchronized关键字、以互斥同步的方式的解决线程安全问题,那么什么是线程安全呢?这里引用《java并发编程实战》作者brian goetz给出的定义:
当多个线程同时访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那就称这个对象是线程安全的。—— brian goetz
2.1 synchronized的使用
先写过个demo,大致过一下synchronized的使用,包含同步代码块、实例方法和静态方法。
public synchronized void test1(){
}
public void test2(){
synchronized(new test()){
}
}
public static synchronized void test3(){
}
反编译可查看字节码:
public synchronized void test1();
descriptor: ()v
flags: acc_public, acc_synchronized // here
public void test2();
descriptor: ()v
flags: acc_public
code:
stack=2, locals=3, args_size=1
0: new #2 // class com/easy/helloworld/test
3: dup
4: invokespecial #3 // method "<init>":()v
7: dup
8: astore_1
9: monitorenter // here
10: aload_1
11: monitorexit // here
12: goto 20
15: astore_2
16: aload_1
17: monitorexit // here
18: aload_2
19: athrow
20: return
public static synchronized void test3();
descriptor: ()v
flags: acc_public, acc_static, acc_synchronized // here
可以观察到:
- 同步代码:通过moniterenter、moniterexit 关联到到一个monitor对象,进入时设置owner为当前线程,计数+1、退出-1。除了正常出口的 monitorexit,还在异常处理代码里插入了 monitorexit。
- 实例方法:隐式调用moniterenter、moniterexit
- 静态方法:隐式调用moniterenter、moniterexit
2.2 moniterenter、moniterexit
monitorenter和monitorexit这两个jvm指令,主要是基于 mark word 和 object monitor 来实现的。
在 jvm 中,对象在内存中分为三块区域:
- 对象头:由 mark word 和 klass point 构成。
- 1. mark word(标记字段): 用于存储对象自身的运行时数据,例如存储对象的hashcode,分代年龄、锁标志位等信息,是synchronized实现轻量级锁和偏向锁的关键。64位jvm的mark word组成如下:
- 2. klass point(类型指针): 对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
- 实例数据:这部分主要是存放类的数据信息,父类的信息。
- 字节对齐:为了内存的io性能,jvm要求对象起始地址必须是8字节的整数倍。对于不对齐的对象,需要填充数据进行对齐。
在jdk 1.6之前, synchronized 只有传统的锁机制,直接关联到 monitor 对象,存在性能上的瓶颈。在jdk 1.6后,为了提高锁的获取与释放效率,jvm引入了两种锁机制:偏向锁和轻量级锁。它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。这几种锁的实现和转换正是依靠对象头中的 mark word 。
本章内容近万字,时间不充裕的同学可以直接看 本章小节 。
2.3 偏向锁
引入偏向锁的目的:在没有多线程竞争的情况下,尽量减少不必要的轻量级锁的执行。轻量级锁的获取及释放依赖多次cas原子指令,而偏向锁只依赖一次cas原子指令。但在多线程竞争时,需要进行偏向锁撤销步骤,因此其撤销的开销必须小于节省下来的cas开销,否则偏向锁并不能带来收益。jdk 1.6中默认开启偏向锁,可以通过-xx:-usebiasedlocking来禁用偏向锁。
2.3.1 进入偏向锁
关于hotspot虚拟机中获取锁的入口,网上主要有两种看法:一为interpreterruntime.cpp#monitorenter#1608;二为bytecodeinterpreter.cpp#1816。在hotspot的中,有两处地方对 monitorenter 指令进行解析:一个是bytecodeinterpreter.cpp#1816 ,另一个在templatetable_x86_64.cpp#3667。其中, bytecodeinterpreter 是jvm中的字节码解释器, templateinterpreter 为模板解释器。hotspot对运行效率有着极其执着的追求,显然会倾向于用模板解释器来实现。r大的读书笔记中有说明,hotspot中只用到了模板解释器,并没有用到字节码解释器。因此,本文认为 montorenter 的解析入口为templatetable_x86_64.cpp#3667。
但模板解释器 templateinterpreter 都是汇编代码,不易读,且实现逻辑与字节码解释器 bytecodeinterpreter 大体一致。因此本文的源码都以 bytecodeinterpreter 来说明,借此窥探 synchronized 的实现原理。在看代码之前,先介绍几个在偏向锁中会被大量应用的概念,以便后续理解:
prototype_header :jvm中的每个类有一个类似 mark word 的 prototype_header ,用来标记该class的 epoch 和偏向开关等信息。
匿名偏向状态 :锁对象mark word标志位为101,且存储的 thread id 为空时的状态(即锁对象为偏向锁,且没有线程偏向于这个锁对象)。
atomic::cmpxchg_ptr :cas函数。这个方法有三个参数,依次为 exchange_value 、 dest 、 compare_value 。如果dest的值为 compare_value 则更新为 exchange_value ,并返回 compare_value 。否则,不更新并返回 实际原值 。
接下来开始源码实现分析,hotspot中偏向锁的具体实现可参考bytecodeinterpreter.cpp#1816,代码如下:
case(_monitorenter): {
//锁对象
oop lockee = stack_object(-1);
// derefing's lockee ought to provoke implicit null check
check_null(lockee);
// 步骤1
// 在栈中找到第一个空闲的lock record
// 会找到栈中最高的
basicobjectlock* limit = istate->monitor_base();
basicobjectlock* most_recent = (basicobjectlock*) istate->stack_base();
basicobjectlock* entry = null;
while (most_recent != limit ) {
if (most_recent->obj() == null) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
// entry不为null,代表还有空闲的lock record
if (entry != null) {
// 将lock record的obj指针指向锁对象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markoopdesc::epoch_mask_in_place;
// markoop即对象头的mark word
markoop mark = lockee->mark();
intptr_t hash = (intptr_t) markoopdesc::no_hash;
// 步骤2
// implies usebiasedlocking
// 如果为偏向模式,即判断标识位是否为101
if (mark->has_bias_pattern()) {
...
// 一顿操作
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markoopdesc::age_mask_in_place);
// 步骤3
if (anticipated_bias_locking_value == 0) {
// already biased towards this thread, nothing to do
// 偏向的是自己,啥都不做
if (printbiasedlockingstatistics) {
(* biasedlocking::biased_lock_entry_count_addr())++;
}
success = true;
}
// class的prototype_header不是偏向模式
else if ((anticipated_bias_locking_value & markoopdesc::biased_lock_mask_in_place) != 0) {
// 尝试撤销偏向
...
}
// epoch过期,重新偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// try rebias
...
success = true;
}
else {
// try to bias towards thread in case object is anonymously biased
// 尝试偏向该线程,只有匿名偏向能成功
// 构建了匿名偏向的mark word
markoop header = (markoop) ((uintptr_t) mark & ((uintptr_t)markoopdesc::biased_lock_mask_in_place |(uintptr_t)markoopdesc::age_mask_in_place |epoch_mask_in_place));
if (hash != markoopdesc::no_hash) {
header = header->copy_set_hash(hash);
}
// 用「或」操作设置thread id
markoop new_header = (markoop) ((uintptr_t) header | thread_ident);、
// 只有匿名偏向才能成功
if (atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// cas修改成功
if (printbiasedlockingstatistics)
(* biasedlocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
// 失败说明存在竞争,进入monitorenter
call_vm(interpreterruntime::monitorenter(thread, entry), handle_exception);
}
success = true;
}
}
// 步骤4
// traditional lightweight locking
// false走轻量级锁逻辑
if (!success) {
// 构造一个无锁状态的displaced mark word,并将lock record指向它
markoop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
bool call_vm = useheavymonitors;
if (call_vm || atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 如果cas替换不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入
// is it simple recursive case?
if (!call_vm && thread->is_lock_owned((address) displaced->clear_lock_bits())) {
// 如果是锁重入,则直接将displaced mark word设置为null
// 轻量级锁重入是使用lock record的数量来计入的
entry->lock()->set_displaced_header(null);
} else {
call_vm(interpreterruntime::monitorenter(thread, entry), handle_exception);
}
}
}
update_pc_and_tos_and_continue(1, -1);
} else {
// 没拿到lock record,重新执行
istate->set_msg(more_monitors);
update_pc_and_return(0); // re-execute
}
}
偏向锁流程:
步骤 1 、从当前线程的栈中找到一个空闲的 lock record ,并指向当前锁对象。
步骤 2 、获取对象的markoop数据mark,即对象头的mark word;
步骤 3 、判断锁对象的 mark word 是否是偏向模式,即第3位是否为101。若不是,进入步骤4。若是,计算
anticipated_bias_locking_value ,判断偏向状态:
步骤 3.1 、
anticipated_bias_locking_value 若为0,代表 偏向的线程是当前线程 且 mark word 的epoch等于class的epoch,这种情况下直接执行同步代码块,什么都不用做。
步骤 3.2 、判断class的 prototype_header 是否为非偏向模式。若为非偏向模式,cas尝试将对象恢复为无锁状态。无论cas是否成功都会进入轻量级锁逻辑。
步骤 3.3 、如果epoch偏向 时间戳已过期 ,则需要重偏向。利用cas指令将锁对象的 mark word 替换为一个偏向当前线程且epoch为类的epoch的新的 mark word 。
步骤 3.4 、cas将偏向线程改为当前线程,如果当前是 匿名偏向 (即对象头中的bit field存储的thread id为空)且 无并发冲突 ,则能 修改成功 获取偏向锁,否则进入 锁升级 的逻辑。
步骤 4 、走到一步会进行轻量级锁逻辑。构造一个无锁状态的 mark word ,然后存储到 lock record 。设置为无锁状态的原因是:轻量级锁解锁时是将对象头的 mark word cas替换为 lock record 中的 displaced mark word ,所以设置为无锁状态。如果是锁重入,则将 lock record 的 displaced mark word 设置为null,放到栈帧中,起到计数作用。
以上是偏向锁加锁的大致流程,如果当前锁 已偏向其他线程 || epoch值过期 || class偏向模式关闭 || 获取偏向锁的过程中存在并发冲突 ,都会进入到
interpreterruntime::monitorenter 方法, 在该方法中会进行偏向锁撤销和升级。流程如下图所示:
issue:有的同学可能会问了,对象一开始不是无锁状态吗,为什么上述偏向锁逻辑没有判断 无锁状态的锁对象 (001)?
只有匿名偏向的对象才能进入偏向锁模式。偏向锁是延时初始化的,默认是4000ms。初始化后会将所有加载的klass的prototype header修改为匿名偏向样式。当创建一个对象时,会通过klass的prototype_header来初始化该对象的对象头。简单的说,偏向锁初始化结束后,后续所有对象的对象头都为 匿名偏向 样式,在此之前创建的对象则为 无锁状态 。而对于无锁状态的锁对象,如果有竞争,会直接进入到轻量级锁。这也是为什么jvm启动前4秒对象会直接进入到轻量级锁的原因。
为什么需要延迟初始化?
jvm启动时必不可免会有大量sync的操作,而偏向锁并不是都有利。如果开启了偏向锁,会发生大量锁撤销和锁升级操作,大大降低jvm启动效率。
因此,我们可以明确地说,只有锁对象处于 匿名偏向 状态,线程才能拿到到我们通常意义上的偏向锁。而处于无锁状态的锁对象,只能进入到轻量级锁状态。
2.3.2 偏向锁的撤销
偏向锁的 撤销 (revoke)是一个很特殊的操作,为了执行撤销操作,需要等待 全局安全点 ,此时所有的工作线程都停止了执行。偏向锁的撤销操作并不是将对象恢复到无锁可偏向的状态,而是在偏向锁的获取过程中,发现竞争时,直接将一个被偏向的对象 升级到 被加了轻量级锁的状态。这个操作的具体完成方式如下:
irt_entry_no_async(void, interpreterruntime::monitorenter(javathread* thread, basicobjectlock* elem))
...
handle h_obj(thread, elem->obj());
assert(universe::heap()->is_in_reserved_or_null(h_obj()),
"must be null or an object");
// 开启了偏向锁
if (usebiasedlocking) {
// retry fast entry if bias is revoked to avoid unnecessary inflation
objectsynchronizer::fast_enter(h_obj, elem->lock(), true, check);
} else {
objectsynchronizer::slow_enter(h_obj, elem->lock(), check);
}
...
如果开启了jvm偏向锁,则会进入到
objectsynchronizer::fast_enter 方法中。
void objectsynchronizer::fast_enter(handle obj, basiclock* lock, bool attempt_rebias, traps) {
//再次校验
if (usebiasedlocking) {
if (!safepointsynchronize::is_at_safepoint()) {
//不在安全点的执行
biasedlocking::condition cond = biasedlocking::revoke_and_rebias(obj, attempt_rebias, thread);
if (cond == biasedlocking::bias_revoked_and_rebiased) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward vm thread");
//批量撤销,底层调用bulk_revoke_or_rebias_at_safepoint
biasedlocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, thread) ;
}
主要看
biasedlocking::revoke_and_rebias 方法。这个方法的主要作用像它的方法名:撤销或者重偏向。第一个参数封装了锁对象和当前线程,第二个参数代表是否允许重偏向,这里是true。
biasedlocking::condition biasedlocking::revoke_and_rebias(handle obj, bool attempt_rebias, traps) {
assert(!safepointsynchronize::is_at_safepoint(), "must not be called while at safepoint");
markoop mark = obj->mark(); //获取锁对象的对象头
if (mark->is_biased_anonymously() && !attempt_rebias) {
// 如果锁对象为匿名偏向状态且不允许重偏向下,进入该分支。在一个非全局安全点进行偏向锁撤销
markoop biased_value = mark;
// 创建一个匿名偏向的markword
markoop unbiased_prototype = markoopdesc::prototype()->set_age(mark->age());
// 通过cas重新设置偏向锁状态
markoop res_mark = (markoop) atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {// 如果cas成功,返回偏向锁撤销状态
return bias_revoked;
}
} else if (mark->has_bias_pattern()) {
// 锁为偏向模式(101)会走到这里
klass* k = obj->klass();
markoop prototype_header = k->prototype_header();
// 如果对应class关闭了偏向模式
if (!prototype_header->has_bias_pattern()) {
markoop biased_value = mark;
// cas更新对象头markword为非偏向锁
markoop res_mark = (markoop) atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return bias_revoked; // 返回偏向锁撤销状态
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
// 如果epoch过期,则进入当前分支
if (attempt_rebias) {
// 如果允许重偏
assert(thread->is_java_thread(), "");
markoop biased_value = mark;
markoop rebiased_prototype = markoopdesc::encode((javathread*) thread, mark->age(), prototype_header->bias_epoch());
// 通过cas操作, 将本线程的 threadid 、时间戳、分代年龄尝试写入对象头中
markoop res_mark = (markoop) atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) { //cas成功,则返回撤销和重新偏向状态
return bias_revoked_and_rebiased;
}
} else {
// 如果不允许尝试获取偏向锁,进入该分支取消偏向
// 通过cas操作更新分代年龄
markoop biased_value = mark;
markoop unbiased_prototype = markoopdesc::prototype()->set_age(mark->age());
markoop res_mark = (markoop) atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) { //如果cas操作成功,返回偏向锁撤销状态
return bias_revoked;
}
}
}
}
//执行到这里有以下两种情况:
//1.对象不是偏向模式
//2.上面的cas操作失败
heuristicsresult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == hr_not_biased) {
// 非偏向从这出去
// 轻量级锁、重量级锁
return not_biased;
} else if (heuristics == hr_single_revoke) {
// 撤销单个线程
// mark,最常见的执行分支
// mark,最常见的执行分支
// mark,最常见的执行分支
klass *k = obj->klass();
markoop prototype_header = k->prototype_header();
if (mark->biased_locker() == thread &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
// 偏向当前线程且不过期
// 这里撤销的是偏向当前线程的锁,调用object#hashcode方法时也会走到这一步
// 因为只要遍历当前线程的栈就能拿到lock record了,所以不需要等到safe point再撤销。
resourcemark rm;
if (tracebiasedlocking) {
tty->print_cr("revoking bias by walking my own stack:");
}
biasedlocking::condition cond = revoke_bias(obj(), false, false, (javathread*) thread);
((javathread*) thread)->set_cached_monitor_info(null);
assert(cond == bias_revoked, "why not?");
return cond;
} else {
// 下面代码最终会在safepoint调用revoke_bias方法撤销偏向
vm_revokebias revoke(&obj, (javathread*) thread);
vmthread::execute(&revoke);
return revoke.status_code();
}
}
assert((heuristics == hr_bulk_revoke) ||
(heuristics == hr_bulk_rebias), "?");
//批量撤销、批量重偏向的逻辑
vm_bulkrevokebias bulk_revoke(&obj, (javathread*) thread,
(heuristics == hr_bulk_rebias),
attempt_rebias);
vmthread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
这块代码注释写的算是比较清楚,只简单介绍下最常见的情况:锁已经偏向线程a,此时线程b尝试获取锁。这种情况下会走到mark标记的分支。如果需要撤销的是当前线程,只要遍历当前线程的栈就能拿到lock record,可以直接调用 revoke_bias ,不需要等到safe point再撤销。在调用object#hashcode时,也会走到该分支将为偏向锁的锁对象直接恢复为无锁状态。若不是当前线程,会被push到vm thread中等到 safepoint 的时候再执行。
vmthread内部维护了一个vmoperationqueue类型的队列,用于保存内部提交的vm线程操作vm_operation。gc、偏向锁的撤销等操作都是在这里被执行。
撤销调用的 revoke_bias 方法的代码就不贴了。大致逻辑是:
步骤 1 、查看偏向的线程是否存活,如果已经死亡,则直接撤销偏向锁。jvm维护了一个集合存放所有存活的线程,通过遍历该集合判断某个线程是否存活。
步骤 2 、偏向的线程是否还在同步块中,如果不在,则撤销偏向锁。如果在同步块中,执行步骤3。这里 是否在同步块的判断 基于上文提到的偏向锁的重入计数方式:在偏向锁的获取中,每次进入同步块的时候都会在栈中找到第一个可用(即栈中最高的)的 lock record ,将其obj字段指向锁对象。每次解锁的时候都会把最低的 lock record 移除掉,所以可以通过遍历线程栈中的 lock record 来判断是否还在同步块中。轻量级锁的重入也是基于 lock record 的计数来判断。
步骤 3 、升级为轻量级锁。将偏向线程所有相关 lock record 的 displaced mark word 设置为null,再将最高位的 lock record 的 displaced mark word 设置为无锁状态,然后将对象头指向最高位的 lock record 。这里没有用到cas指令,因为是在 safepoint ,可以直接升级成轻量级锁。
2.3.3 偏向锁的释放
偏向锁的释放可参考bytecodeinterpreter.cpp#1923,这里也不贴了。偏向锁的释放只要将对应 lock record 释放就好了,但这里的释放并不会将mark word里面的thread id去掉,这样做是为了下一次更方便的加锁。而轻量级锁则需要将 displaced mark word 替换到对象头的mark word中。如果cas失败或者是重量级锁则进入到
interpreterruntime::monitorexit 方法中。
2.3.4 批量重偏向与撤销
从上节偏向锁的加锁解锁过程中可以看出,当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到 safe point 时将偏向锁撤销为无锁状态或升级为轻量级/重量级锁。因此,jvm中增加了一种批量重偏向/撤销的机制以减少锁撤销的开销,而mark word中的epoch也是在这里被大量应用,这里不展开说明。但无论怎么优化,偏向锁的撤销仍有一定不可避免的成本。如果业务场景存在大量多线程竞争,那偏向锁的存在不仅不能提高性能,而且会导致性能下降( 偏向锁并不都有利,jdk15默认不开启 )。
2.4 轻量级锁
引入轻量级锁的目的:在多线程交替执行同步块的情况下,尽量避免重量级锁使用的操作系统互斥量带来的开销,但是如果多个线程在同一时刻进入临界区,会导致轻量级锁膨胀升级重量级锁,所以轻量级锁的出现并非是要替代重量级锁。
2.4.1 进入轻量级锁
轻量级锁在上文或多或少已经涉及到,其获取流程入口为bytecodeinterpreter.cpp#1816。前大半部分都是偏向锁逻辑,还有一部分为轻量级锁逻辑。在偏向锁逻辑中,cas失败会执行到
interpreterruntime::monitorenter 。在轻量级锁逻辑中,如果当前线程不是轻量级锁的重入,也会执行到
interpreterruntime::monitorenter 。我们再看看
interpreterruntime::monitorenter 方法:
irt_entry_no_async(void, interpreterruntime::monitorenter(javathread* thread, basicobjectlock* elem))
...
handle h_obj(thread, elem->obj());
assert(universe::heap()->is_in_reserved_or_null(h_obj()),
"must be null or an object");
if (usebiasedlocking) {
// retry fast entry if bias is revoked to avoid unnecessary inflation
objectsynchronizer::fast_enter(h_obj, elem->lock(), true, check);
} else {
objectsynchronizer::slow_enter(h_obj, elem->lock(), check);
}
...
irt_end
fast_enter 的流程在偏向锁的撤销小节中已经分析过,主要逻辑为 revoke_and_rebias :如果当前是偏向模式且偏向的线程还在使用锁,会将锁的 mark word 改为轻量级锁的状态,并将偏向的线程栈中的 lock record 修改为轻量级锁对应的形式(此时lock record是无锁状态),且返回值不是 bias_revoked_and_rebiased ,会继续执行 slow_enter 。
我们直接看 slow_enter 的流程:
void objectsynchronizer::slow_enter(handle obj, basiclock* lock, traps) {
// 步骤1
markoop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 步骤2
// 如果为无锁状态
if (mark->is_neutral()) {
// 步骤3
// 设置mark word到栈
lock->set_displaced_header(mark);
// cas更新指向栈中lock record的指针
if (mark == (markoop) atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
tevent (slow_enter: release stacklock) ;
return ;
}
// fall through to inflate() ... cas失败走下面锁膨胀方法
} else if (mark->has_locker() && thread->is_lock_owned((address)mark->locker())) {
// 步骤4
// 为轻量级锁且owner为当前线程
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (basiclock*)obj->mark(), "don't relock with same basiclock");
// 设置displaced mark word为null,重入计数用
lock->set_displaced_header(null);
return;
}
// 步骤5
// 走到这一步说明已经是存在多个线程竞争锁了,需要膨胀或已经是重量级锁
lock->set_displaced_header(markoopdesc::unused_mark());
// 进入、膨胀到重量级锁的入口
// 膨胀后再调用monitor的enter方法竞争锁
objectsynchronizer::inflate(thread, obj())->enter(thread);
}
步骤 1 、 markoop mark = obj->mark() 方法获取对象的markoop数据mark;
步骤 2 、 mark->is_neutral() 方法判断mark是否为无锁状态,标识位 001 ;
步骤 3 、如果mark处于无锁状态,把mark保存到basiclock对象(lock record的属性)的displaced_header字段;
步骤 3.1 、通过cas尝试将mark word更新为指向basiclock对象的指针,如果更新成功,表示竞争到锁,则执行同步代码,否则执行步骤4;
步骤 4 、如果是重入,则设置displaced mark word为null。
步骤 5 、到这说明有多个线程竞争轻量级锁,轻量级锁需要膨胀升级为重量级锁;
结合上文偏向锁的流程,可以整理得到如下的流程图:
2.4.2 轻量级锁的释放
轻量级锁释放的入口在bytecodeinterpreter.cpp#1923。
轻量级锁释放时需要将 displaced mark word 替换回对象头的 mark word 中。如果cas失败或者是重量级锁则进入到
interpreterruntime::monitorexit 方法中。 monitorexit 直接调用 slow_exit 方法释放 lock record 。直接看 slow_exit :
irt_entry_no_async(void, interpreterruntime::monitorexit(javathread* thread, basicobjectlock* elem))
handle h_obj(thread, elem->obj());
assert(universe::heap()->is_in_reserved_or_null(h_obj()),
"must be null or an object");
if (elem == null || h_obj()->is_unlocked()) {
throw(vmsymbols::java_lang_illegalmonitorstateexception());
}
// 直接调用slow_exit
objectsynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// free entry. this must be done here, since a pending exception might be installed on
// exit. if it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(null);
irt_end
void objectsynchronizer::slow_exit(oop object, basiclock* lock, traps) {
fast_exit (object, lock, thread) ;
}
void objectsynchronizer::fast_exit(oop object, basiclock* lock, traps) {
...
// displaced header就是对象mark word的拷贝
markoop dhw = lock->displaced_header();
markoop mark ;
if (dhw == null) {
// 什么也不做
// recursive stack-lock. 递归堆栈锁
// diagnostics -- could be: stack-locked, inflating, inflated.
...
return ;
}
mark = object->mark() ;
// 此处为轻量级锁的释放过程,使用cas方式解锁。
// 如果对象被当前线程堆栈锁定,尝试将displaced header和锁对象中的markword替换回来。
// if the object is stack-locked by the current thread, try to
// swing the displaced header from the box back to the mark.
if (mark == (markoop) lock) {
assert (dhw->is_neutral(), "invariant") ;
if ((markoop) atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
tevent (fast_exit: release stacklock) ;
return;
}
}
//走到这里说明已经是重量级锁或者解锁时发生了竞争,膨胀后再调用monitor的exit方法释放
objectsynchronizer::inflate(thread, object)->exit (true, thread) ;
}
最后执行的是如果是fast_exit方法。如果是轻量级锁,尝试cas替换 mark word 。若解锁时有竞争,会调用 inflate 方法进行重量级锁膨胀,升级到到重量级锁后再执行 exit 方法。
2.5 重量级锁
2.5.1 重量级锁的进入
重量级锁通过对象内部的监视器(monitor)实现,其依赖于底层操作系统的 mutex lock 实现,需要额外的用户态到内核态切换的开销。由上文分析, slow_enter 获取轻量级锁未成功时,会在 inflate 中完成锁膨胀:
objectmonitor * attr objectsynchronizer::inflate (thread * self, oop object) {
...
for (;;) {
const markoop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// mark是以下状态中的一种:
// * inflated(重量级锁状态) - 直接返回
// * stack-locked(轻量级锁状态) - 膨胀
// * inflating(膨胀中) - 忙等待直到膨胀完成
// * neutral(无锁状态) - 膨胀
// * biased(偏向锁) - 非法状态,在这里不会出现
// case: inflated
if (mark->has_monitor()) {
// 已经是重量级锁状态了,直接返回
objectmonitor * inf = mark->monitor() ;
...
return inf ;
}
// case: inflation in progress
if (mark == markoopdesc::inflating()) {
// 正在膨胀中,说明另一个线程正在进行锁膨胀,continue重试
tevent (inflate: spin while inflating) ;
// 在该方法中会进行spin/yield/park等操作完成自旋动作
readstablemark(object) ;
continue ;
}
// 当前是轻量级锁,后面分析
// case: stack-locked
if (mark->has_locker()) {
...
}
// 无锁状态
// case: neutral
// 分配以及初始化objectmonitor对象
objectmonitor * m = omalloc (self) ;
// prepare m for installation - set monitor to initial state
m->recycle();
m->set_header(mark);
// owner为null
m->set_owner(null);
m->set_object(object);
m->owneristhread = 1 ;
m->_recursions = 0 ;
m->_responsible = null ;
m->_spinduration = objectmonitor::knob_spinlimit ; // consider: keep metastats by type/class
// 用cas替换对象头的mark word为重量级锁状态
if (atomic::cmpxchg_ptr (markoopdesc::encode(m), object->mark_addr(), mark) != mark) {
// 不成功说明有另外一个线程在执行inflate,释放monitor对象
m->set_object (null) ;
m->set_owner (null) ;
m->owneristhread = 0 ;
m->recycle() ;
omrelease (self, m, true) ;
m = null ;
continue ;
// interference - the markword changed - just retry.
// the state-transitions are one-way, so there's no chance of
// live-lock -- "inflated" is an absorbing state.
}
...
return m ;
}
inflate 其中是一个for循环,主要是为了处理多线程同时调用inflate的情况。然后会根据锁对象的状态进行不同的处理:
- 已经是重量级状态,说明膨胀已经完成,返回并继续执行objectmonitor::enter方法。
- 如果是轻量级锁则需要进行膨胀操作。
- 如果是膨胀中状态,则进行忙等待。
- 如果是无锁状态则需要进行膨胀操作。
轻量级锁膨胀流程如下:
if (mark->has_locker()) {
// 步骤1
// 当前轻量级锁状态,先分配一个objectmonitor对象,并初始化值
objectmonitor * m = omalloc (self) ;
m->recycle();
m->_responsible = null ;
m->owneristhread = 0 ;
m->_recursions = 0 ;
m->_spinduration = objectmonitor::knob_spinlimit ; // consider: maintain by type/class
// 步骤2
// 将锁对象的mark word设置为inflating (0)状态
markoop cmp = (markoop) atomic::cmpxchg_ptr (markoopdesc::inflating(), object->mark_addr(), mark) ;
if (cmp != mark) {
omrelease (self, m, true) ;
continue ; // interference -- just retry
}
// 步骤3
// 栈中的displaced mark word
markoop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
// 设置monitor的字段
m->set_header(dmw) ;
// owner为lock record
m->set_owner(mark->locker());
m->set_object(object);
...
// 步骤4
// 将锁对象头设置为重量级锁状态
object->release_set_mark(markoopdesc::encode(m));
...
return m ;
}
步骤 1 、调用 omalloc 获取一个可用的 objectmonitor 对象。在 omalloc 方法中会先从 线程私有 的 monitor 集合 omfreelist 中分配对象,如果 omfreelist 中已经没有 monitor 对象,则从 jvm全局 的 gfreelist 中分配一批 monitor 到 omfreelist 中;
步骤 2 、通过cas尝试将mark word设置为markoopdesc:inflating,标识当前锁正在膨胀中。如果cas失败,说明同一时刻其它线程已经将mark word设置为markoopdesc:inflating,当前线程进行自旋等待膨胀完成。
步骤 3 、如果cas成功,设置monitor的各个字段:设置 monitor 的header字段为 displaced mark word ,owner字段为 lock record ,obj字段为锁对象等;
步骤 4 、设置锁对象头的 mark word 为重量级锁状态,指向第一步分配的 monitor 对象;
2.5.2 monitor竞争
当锁膨胀 inflate 执行完并返回对应的 objectmonitor 时,并不表示该线程竞争到了锁,真正的锁竞争发生在 objectmonitor::enter 方法中。
void attr objectmonitor::enter(traps) {
thread * const self = thread ;
void * cur ;
// 步骤1
// owner为null,如果能cas设置成功,则当前线程直接获得锁
cur = atomic::cmpxchg_ptr (self, &_owner, null) ;
if (cur == null) {
...
return ;
}
// 如果是重入的情况
if (cur == self) {
// todo-fixme: check for integer overflow! bugid 6557169.
_recursions ++ ;
return ;
}
// 步骤2
// 如果当前线程是之前持有轻量级锁的线程
// 上节轻量级锁膨胀将owner指向之前lock record的指针
// 这里利用owner判断是否第一次进入。
if (self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
// 重入计数重置为1
_recursions = 1 ;
// 设置owner字段为当前线程
_owner = self ;
owneristhread = 1 ;
return ;
}
...
// 步骤3
// 在调用系统的同步操作之前,先尝试自旋获得锁
if (knob_spinearly && tryspin (self) > 0) {
...
//自旋的过程中获得了锁,则直接返回
self->_stalled = 0 ;
return ;
}
...
{
...
// 步骤4
for (;;) {
jt->set_suspend_equivalent();
// 在该方法中调用系统同步操作
enteri (thread) ;
...
}
self->set_current_pending_monitor(null);
}
...
}
步骤 1 、当前是无锁、锁重入,简单操作后返回。
步骤 2 、当前线程是之前持有轻量级锁的线程,则为首次进入,设置recursions为1,owner为当前线程,该线程成功获得锁并返回。
步骤 3 、先 自旋尝试 获得锁,尽可能减少同步操作带来的开销。
步骤 4 、调用enteri方法。
这里注意,轻量级锁膨胀成功时,会把owner字段设置为 lock record 的指针,并在竞争时判断。这么做的原因是,假设当前线程a持有锁对象的锁,线程b进入同步代码块,并把锁对象升级为重量级锁。但此时,线程a可能还在执行,并无法感知其持有锁对象的变化。因此,需要线程b在执行 objectmonitor::enter 时,将自己放入到阻塞等列等待。并需要线程a第二次进入、或者退出的时候对monitor进行一些操作,以此保证代码块的同步。
这里有个 自旋 操作,直接看 tryspin 对应的方法:
// tryspin对应的方法
int objectmonitor::tryspin_varyduration (thread * self) {
// dumb, brutal spin. good for comparative measurements against adaptive spinning.
int ctr = knob_fixedspin ; // 固定自旋次数
if (ctr != 0) {
while (--ctr >= 0) {
if (trylock (self) > 0) return 1 ;
spinpause () ;
}
return 0 ;
}
// 上一次自旋次数
for (ctr = knob_prespin + 1; --ctr >= 0 ; ) {
if (trylock(self) > 0) { // 尝试获取锁
// increase _spinduration ...
// note that we don't clamp spinduration precisely at spinlimit.
// raising _spurduration to the poverty line is key.
int x = _spinduration ;
if (x < knob_spinlimit) {
if (x < knob_poverty) x = knob_poverty ;
_spinduration = x + knob_bonusb ;
}
return 1 ;
}
...
...
从方法名和注释可以看出,这就是自适应自旋, 和网上说的轻量级锁cas失败会自旋的说法并不一致 。实际上,无论是轻量级锁cas自旋还是重量级锁cas自旋,都是在用户态尽可能减少同步操作带来的开销,并没有太多本质上的区别。到此为止,我们可以再结合上述的内容,整理出如下的状态转换图:
2.5.3 monitor等待
objectmonitor 竞争失败的线程,通过自旋执行 objectmonitor::enteri 方法等待锁的释放,enteri方法的部分逻辑实现如下:
void attr objectmonitor::enteri (traps) {
// 尝试自旋
if (tryspin (self) > 0) {
...
return ;
}
...
// 将线程封装成node节点中
objectwaiter node(self) ;
self->_parkevent->reset() ;
node._prev = (objectwaiter *) 0xbad ;
node.tstate = objectwaiter::ts_cxq ;
// 将node节点插入到_cxq队列的头部,cxq是一个单向链表
objectwaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// cas失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率
if (trylock (self) > 0) {
...
return ;
}
}
...
}
enteri大致原理:一个 objectmonitor 对象包括两个同步队列( _cxq 和 _entrylist ) ,以及一个等待队列 _waitset 。cxq、entrylist 、waitset都是由objectwaiter构成的链表结构。其中, _cxq 为单向链表, _entrylist 为双向链表。
当一个线程尝试获得重量级锁且没有竞争到时,该线程会被封装成一个 objectwaiter 对象插入到cxq的队列的队首,然后调用 park 函数挂起当前线程,进入blocked状态。当线程释放锁时,会根据唤醒策略,从cxq或entrylist中挑选一个线程 unpark 唤醒。如果线程获得锁后调用 object#wait 方法,则会将线程加入到waitset中,进入waiting或timed_waiting状态。当被 object#notify 唤醒后,会将线程从waitset移动到cxq或entrylist中去,进入blocked状态。需要注意的是,当调用一个锁对象的 wait 或 notify 方法时,若当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。
2.5.4 monitor释放
当某个持有锁的线程执行完同步代码块时,会进行锁的释放。在hotspot中,通过退出monitor的方式实现锁的释放,并通知被阻塞的线程,具体实现位于 objectmonitor::exit 方法中。
void attr objectmonitor::exit(bool not_suspended, traps) {
thread * self = thread ;
// 如果_owner不是当前线程
if (thread != _owner) {
// 轻量级锁膨胀上来,还没调用过enter方法,_owner还指向之前轻量级锁lock record的指针。
if (thread->is_lock_owned((address) _owner)) {
assert (_recursions == 0, "invariant") ;
_owner = thread ;
_recursions = 0 ;
owneristhread = 1 ;
} else {
// 异常情况:当前不是持有锁的线程
tevent (exit - throw imsx) ;
assert(false, "non-balanced monitor enter/exit!");
if (false) {
throw(vmsymbols::java_lang_illegalmonitorstateexception());
}
return;
}
}
// 重入计数器还不为0,则计数器-1后返回
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
tevent (inflated exit - recursive) ;
return ;
}
...
//这块开始是唤醒操作
for (;;) {
...
...
objectwaiter * w = null ;
// 根据qmode的不同会有不同的唤醒策略,默认为0
int qmode = knob_qmode ;
if (qmode == 2 && _cxq != null) {
...
...
步骤 1 、处理owner不是当前线程的状况。这里特指之前持有轻量级锁的线程,由于没有调用过enter,owner指向仍为lock record的指针,以及其他异常情况。
步骤 2 、重入计数器还不为0,则计数器-1后返回。
步骤 3 、唤醒操作。根据不同的策略(由qmode指定),从cxq或entrylist中获取头节点,通过 objectmonitor::exitepilog 方法唤醒该节点封装的线程,唤醒操作最终由unpark完成。
根据qmode的不同(默认为0),有不同的处理方式:
qmode = 0:暂时什么都不做;
qmode = 2且cxq非空:取cxq队列队首的objectwaiter对象,调用exitepilog方法,该方法会唤醒objectwaiter对象的线程,然后立即返回,后面的代码不会执行了;
qmode = 3且cxq非空:把cxq队列插入到entrylist的尾部;
qmode = 4且cxq非空:把cxq队列插入到entrylist的头部;
只 在qmode=2的时候会提前返回,等于0、3、4的时继续执行:
- 如果entrylist的首元素非空,就取出来调用exitepilog方法,该方法会唤醒objectwaiter对象的线程,然后立即返回;
- 如果entrylist的首元素为空,就将cxq的所有元素放入到entrylist中,然后再从entrylist中取出来队首元素执行exitepilog方法,然后立即返回;
- 被唤醒的线程,继续竞争monitor。
2.6 本章小节
本章介绍了synchronized的底层实现和锁升级过程。对于锁升级,再看看本文整理的图,一图胜千言:
这里有几个点可以注意一下:
- hotspot中,只用到了 模板解释器 ,并没有用到字节码解释器, monitorenter 的实际入口位于templatetable_x86_64.cpp#3667。本文的分析是基于字节码解释器的,因此部分结论不能作为实际执行情况。本章的内容只能作为synchronized锁升级原理、各类锁的适用场景的一种 窥探 。
- 再次强调,无锁状态只能升级为轻量级锁, 匿名偏向状态 才能进入到偏向锁。
- 偏向锁 并不都有利, 其适用于 单个线程重入 的场景,原因为:偏向锁的撤销需要进入 safepoint ,开销较大。需要进入 safepoint 是由于,偏向锁的撤销需要对锁对象的 lock record 进行操作,而 lock record 要到每个线程的栈帧中遍历寻找。在非safepoint,栈帧是动态的,会引入更多的问题。目前看来,偏向锁存在的价值是为历史遗留的collection类如vector和hashtable等做优化,迟早药丸。java 15中默认不开启。
- 执行object类的 hashcode 方法,偏向锁撤销并且锁会膨胀为轻量级锁或者重量锁。执行object类的 wait/notify/notifyall 方法,偏向锁撤销并膨胀成重量级锁。
- 轻量级锁适用于 两个线程的交替执行 场景:线程a进入轻量级锁,退出同步代码块并释放锁,会将锁对象恢复为无锁状态;线程b再进入锁,发现为无锁状态,会cas尝试获取该锁对象的轻量级锁。如果有竞争,则直接膨胀为重量级锁,没有自旋操作,详情看10。
- 唤醒策略依赖于 qmode 。重量级锁获取失败后,线程会加入cxq队列。当线程释放锁时,会从cxq或entrylist中挑选一个线程唤醒。线程获得锁后调用 object#wait 方法,则会将线程加入到waitset中。当被 object#notify 唤醒后,会将线程从waitset移动到cxq或entrylist中去。
- 重量级锁,会将线程放进等待队列,等待操作系统调度。而偏向锁和轻量级锁,未交由操作系统调度,依然处于用户态,只是采用cas无锁竞争的方式获取锁。cas通过unsafe类中compareandswap方法,jni调用c++方法,通过汇编指令锁住cpu中的北桥信号。
- 许多文章声称一个对象关联到一个monitor,这个说法不够准确。如果对象已经是重量级锁了,对象头的确指向了一个 monitor 。但对于正在膨胀的锁,会先从 线程私有 的 monitor 集合 omfreelist 中分配对象。如果 omfreelist 中已经没有 monitor 对象,再从 jvm全局 的 gfreelist 中分配一批 monitor 到 omfreelist 中。
- 在编译期间还有 锁消除 和 锁粗化 这两步锁优化操作,本章没做介绍。
- 字节码实现中没有体现轻量级锁自旋逻辑。这可能是模板解释器中的实现,或者是jvm在不同平台、不同jvm版本的不同实现。但本文分析的字节码链路中没有发现该逻辑,倒是发现了 重量级锁会自适应自旋竞争锁 。因此个人对轻量级锁自适应自旋的说法存疑,至少hotspot jdk8u字节码实现中没有这个逻辑。但两者都是在用户态进行自适应自旋,以尽可能减少同步操作带来的开销,没有太多本质上的区别,并不需要特别关心。
三、线程的实现与状态转换
3.1 线程的实现
(1)内核线程实现
内核线程(kernel-level thread,klt):由 内核 来完成线程切换,内核通过 调度器 对线程进行调度,并负责将线程的任务 映射 到各个处理器上。程序一般不会直接去使用内核线程,而是使用内核线程的一种高级接口—— 轻量级进程 (light weight process,lwp),也就是通常意义上的线程。
优点:每个lwp都是独立的调度单元。一个lwp被阻塞,不影响其他lwp。
缺点:基于klt,耗资源。线程的创建、析构、同步都需要进行系统调用,频繁的用户态、内核态切换。
(2) 用户线程实现(user thread,ut)
广义:非内核线程,都可认为是用户线程。(包括lwt,虽然lwt的大多操作都要映射到klt)
狭义:完全建立在 用户空间 的线程库上,系统内核不能感知线程存在的实现。ut也只感知到掌管这些ut的进程p。
优点:用户线程的创建、同步、销毁和调度完全在用户态中完成,不需要内核的帮助。
缺点:线程的创建、销毁、切换和调度都是用户必须考虑到问题。“阻塞如何处理”、“多处理器系统中如何将线程映射到其他处理器上”这类问题解决起来将会异常困难。
(3) 混合实现 混合模式下, 即存在用户线程,也存在轻量级进程 。用户线程的创建、切换、析构等操作依然廉价,可以支持大规模的用户线程并发,且可以使用内核线程提供的线程调度功能及处理器映射。
线程的实现依赖操作系统支持的线程模型。在主流的操作系统上,hotspot、classic、art等虚拟机默认是 1:1的线程模型。在solaris平台上,hotspot支持1:1、n:m两种线程模型。
3.2 线程的转换
首先明确一点,当我们讨论一个线程的状态,指的是thread 类中threadstatus的值。
private volatile int threadstatus = 0;
该值映射后对应的枚举为:
public enum state {
new,
runnable,
blocked,
waiting,
timed_waiting,
terminated;
}
也就是说,线程的具体状态,看threadstatus就行了。
new
先要创建thread 类的对象,才能谈其状态。
thread t = new thread();
这个时候,线程t就处于新建状态。但他还不是“线程”。
runnable
然后调用start()方法。
t.start();
调用start()后,会执行一个native方法创建内核线程,以linux为例:
private native void start0();
// 最后走到这
hotspot/src/os/linux/vm/os_linux.cpp
pthread_create(...);
这时候才有一个真正的线程创建出来,并即刻开始运行。这个内核线程与线程t进行1:1的映射。这时候t具备运行能力,进入runnable状态。runnable可以细分为ready和running,两者的区别只是是否等待到了资源并开始运行。处于runnable且未运行的线程,会进入一个就绪队列中,等待操作系统的调度。处于就绪 队列的线程都在等待资源,这个资源可以是cpu的时间片、也可以是系统的io。jvm并不关系ready和running这两种状态,毕竟上述的枚举类都不对runnable进行细分。
terminated
当一个线程执行完毕(或者调用已经不建议的 stop 方法),线程的状态就变为 terminated。进入terminated后,线程的状态不可逆,无法再复活。
关于blocked、waiting、timed_waiting
blocked、waiting、timed_waiting都是带有同步语义的状态,我们先看一下 wait 和 notify 方法的底层实现。wait方法的底层实现:
void objectsynchronizer::wait(handle obj, jlong millis, traps) {
...
...
//获得object的monitor对象
objectmonitor* monitor = objectsynchronizer::inflate(thread, obj());
dtrace_monitor_wait_probe(monitor, obj(), thread, millis);
//调用monitor的wait方法
monitor->wait(millis, true, thread);
...
}
inline void objectmonitor::addwaiter(objectwaiter* node) {
...
if (_waitset == null) {
//_waitset为null,就初始化_waitset
_waitset = node;
node->_prev = node;
node->_next = node;
} else {
//否则就尾插
objectwaiter* head = _waitset ;
objectwaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
主要流程:通过object获得objectmonitor,将thread封装成ojectwaiter对象,然后 addwaiter 将它插入 waitset 中,进入waiting或timed_waiting状态。最后释放锁,并通过底层的 park 方法挂起线程;
notify方法的底层实现:
void objectsynchronizer::notify(handle obj, traps) {
...
...
objectsynchronizer::inflate(thread, obj())->notify(thread);
}
//通过inflate方法得到objectmonitor对象
objectmonitor * attr objectsynchronizer::inflate (thread * self, oop object) {
...
if (mark->has_monitor()) {
objectmonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (objectsynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
return inf
}
...
}
//调用objectmonitor的notify方法
void objectmonitor::notify(traps) {
...
//调用dequeuewaiter方法移出_waiterset第一个结点
objectwaiter * iterator = dequeuewaiter() ;
//将上面dequeuewaiter尾插入_entryset或cxq等操作
...
...
}
通过object获得objectmonitor,调用objectmonitor的 notify 方法。这个notify最后会走到
objectmonitor::dequeuewaiter 方法,获取waitset列表中的第一个objectwaiter节点。并根据不同的策略,将取出来的objectwaiter节点,加入到 entrylist 或 cxq 中。 notifyall 的实现类似于 notify ,主要差别在多了个for循环。
由这里以及上一章2.5.4 monitor释放小节中可以了解到, notify 和 notifyall 并不会立即释放所占有的objectmonitor对象,其真正释放objectmonitor的时间点是在执行 monitorexit 指令。
一旦释放 objectmonitor 对象了, entrylist 和 cxq 中的objectwaiter节点会依据 qmode 所配置的策略,通过exitepilog方法唤醒取出来的objectwaiter节点。被唤醒的线程,继续参与monitor的竞争。若竞争失败,重新进入blocked状态,再回顾一下monitor的核心结构。
既然聊到了 wait 和 notify ,那顺便也看下 join 、 sleep 和 park 。
打开 thread.join() 的源码:
public final synchronized void join(long millis) throws interruptedexception {
...
if (millis == 0) {
while (isalive()) {
wait(0);
}
} else {
while (isalive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = system.currenttimemillis() - base;
}
}
}
join 的本质仍然是 wait() 方法。在使用 join 时,jvm会帮我们隐式调用 notify ,因此我们不需要主动notify唤醒主线程。而 sleep() 方法最终是调用 sleepevent 对象的park方法:
int os::sleep(thread* thread, jlong millis, bool interruptible) {
//获取thread中的_sleepevent对象
parkevent * const slp = thread->_sleepevent ;
...
//如果是允许被打断
if (interruptible) {
//记录下当前时间戳,这是时间比较的基准
jlong prevtime = javatimenanos();
for (;;) {
//检查打断标记,如果打断标记为true,则直接返回
if (os::is_interrupted(thread, true)) {
return os_intrpt;
}
//线程被唤醒后的当前时间戳
jlong newtime = javatimenanos();
//睡眠毫秒数减去当前已经经过的毫秒数
millis -= (newtime - prevtime) / nanosecs_per_millisec;
//如果小于0,那么说明已经睡眠了足够多的时间,直接返回
if (millis <= 0) {
return os_ok;
}
//更新基准时间
prevtime = newtime;
//调用_sleepevent对象的park方法,阻塞线程
slp->park(millis);
}
} else {
//如果不能打断,除了不再返回os_intrpt以外,逻辑是完全相同的
for (;;) {
...
slp->park(millis);
...
}
return os_ok ;
}
}
thread.sleep 在jvm层面上是调用thread中 sleepevent 对象的 park() 方法实现阻塞线程,在此过程中会通过判断时间戳来决定线程的睡眠时间是否达到了指定的毫秒。看到这里,对于 sleep 和 wait 的区别应该会有更深入的理解。
park 、 unpark 方法也与同步语义无关。每个线程都与一个许可(permit)关联。 unpark 函数为线程提供permit,线程调用 park 函数则等待并消耗permit。park和unpark方法具体实现比较复杂,这里不展开。到此为止,我们可以整理出如下的线程状态转换图。
3.3 本章小节
java 将os经典五种状态中的ready和running,统一为 runnable。将waiting(即不可能得到 cpu 运行机会的状态)细分为了 blocked、waiting、timed_waiting。本章的内容较为简短,因为部分的内容已囊括在第一章中。
这里提个会使人困惑的问题:使用socket时,调用accept(),read() 等阻塞方法时,线程处于什么状态?
答 案是java线程处于runnable状态,os线程处于waiting状态。因为在jvm层面,等待cpu时间片和等待io资源是等价的。
这里有几个点可以注意一下:
- jvm线程状态不代表内核线程状态;
- blocked的线程一定处于entrylist或cxq中,而处于waiting和timed waiting的线程,可能是由于执行了sleep或park进入该状态,不一定在waitset中。也就是说,处于blocked状态的线程一定是与同步相关。由这可延伸出,调用 jdk 的 lock并获取不到锁的线程,进入的是 waiting 或 timed_waiting 状态,而不是blocked状态。