欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

一文让你读懂Synchronized底层实现,秒杀面试官

程序员文章站 2023-12-02 11:58:10
本文为死磕Synchronized底层实现第三篇文章,内容为轻量级锁实现。 轻量级锁并不复杂,其中很多内容在偏向锁一文中已提及过,与本文内容会有部分重叠。 另外轻量级锁的背景和基本流程在概论中已有讲解。强烈建议在看过两篇文章的基础下阅读本文。 本系列文章将对HotSpot的synchronized锁 ......

本文为死磕synchronized底层实现第三篇文章,内容为轻量级锁实现。

轻量级锁并不复杂,其中很多内容在偏向锁一文中已提及过,与本文内容会有部分重叠。

另外轻量级锁的背景和基本流程在概论中已有讲解。强烈建议在看过两篇文章的基础下阅读本文。

本系列文章将对hotspot的synchronized锁实现进行全面分析,内容包括偏向锁、轻量级锁、重量级锁的加锁、解锁、锁升级流程的原理及源码分析,希望给在研究synchronized路上的同学一些帮助。

本文分为两个部分:

1.轻量级锁获取流程

2.轻量级锁释放流程

本人看的jvm版本是jdk8u。

 

轻量级锁获取流程

下面开始轻量级锁获取流程分析,代码在bytecodeinterpreter.cpp#1816

case(_monitorenter): {
  oop lockee = stack_object(-1);
  ...
  if (entry != null) {
   ...
   // 上面省略的代码中如果cas操作失败也会调用到interpreterruntime::monitorenter

    // traditional lightweight locking
    if (!success) {
      // 构建一个无锁状态的displaced mark word
      markoop displaced = lockee->mark()->set_unlocked();
      // 设置到lock record中去
      entry->lock()->set_displaced_header(displaced);
      bool call_vm = useheavymonitors;
      if (call_vm || atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
        // 如果cas替换不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入
        // is it simple recursive case?
        if (!call_vm && thread->is_lock_owned((address) displaced->clear_lock_bits())) {
          entry->lock()->set_displaced_header(null);
        } else {
          // cas操作失败则调用monitorenter
          call_vm(interpreterruntime::monitorenter(thread, entry), handle_exception);
        }
      }
    }
    update_pc_and_tos_and_continue(1, -1);
  } else {
    istate->set_msg(more_monitors);
    update_pc_and_return(0); // re-execute
  }
}

 

如果锁对象不是偏向模式或已经偏向其他线程,则successfalse。这时候会构建一个无锁状态的mark word设置到lock record中去,我们称lock record中存储对象mark word的字段叫displaced mark word

如果当前锁的状态不是无锁状态,则cas失败。如果这是一次锁重入,那直接将lock record的 displaced mark word设置为null

我们看个demo,在该demo中重复3次获得锁,

synchronized(obj){
    synchronized(obj){
        synchronized(obj){
        }
    }
}

 

假设锁的状态是轻量级锁,下图反应了mark word和线程栈中lock record的状态,可以看到右边线程栈中包含3个指向当前锁对象的lock record。其中栈中最高位的lock record为第一次获取锁时分配的。其displaced mark word的值为锁对象的加锁前的mark word,之后的锁重入会在线程栈中分配一个displaced mark wordnulllock record

一文让你读懂Synchronized底层实现,秒杀面试官

为什么jvm选择在线程栈中添加displaced mark word为null的lock record来表示重入计数呢?首先锁重入次数是一定要记录下来的,因为每次解锁都需要对应一次加锁,解锁次数等于加锁次数时,该锁才真正的被释放,也就是在解锁时需要用到说锁重入次数的。一个简单的方案是将锁重入次数记录在对象头的mark word中,但mark word的大小是有限的,已经存放不下该信息了。另一个方案是只创建一个lock record并在其中记录重入次数,hotspot没有这样做的原因我猜是考虑到效率有影响:每次重入获得锁都需要遍历该线程的栈找到对应的lock record,然后修改它的值。

所以最终hotspot选择每次获得锁都添加一个lock record来表示锁的重入。

接下来看看interpreterruntime::monitorenter方法

irt_entry_no_async(void, interpreterruntime::monitorenter(javathread* thread, basicobjectlock* elem))
  ...
  handle h_obj(thread, elem->obj());
  assert(universe::heap()->is_in_reserved_or_null(h_obj()),
         "must be null or an object");
  if (usebiasedlocking) {
    // retry fast entry if bias is revoked to avoid unnecessary inflation
    objectsynchronizer::fast_enter(h_obj, elem->lock(), true, check);
  } else {
    objectsynchronizer::slow_enter(h_obj, elem->lock(), check);
  }
  ...
irt_end
fast_enter的流程在偏向锁一文已经分析过,如果当前是偏向模式且偏向的线程还在使用锁,那会将锁的mark word改为轻量级锁的状态,同时会将偏向的线程栈中的lock record修改为轻量级锁对应的形式。代码位置在biasedlocking.cpp#212。

// 线程还存活则遍历线程栈中所有的lock record
  growablearray<monitorinfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
  basiclock* highest_lock = null;
  for (int i = 0; i < cached_monitor_info->length(); i++) {
    monitorinfo* mon_info = cached_monitor_info->at(i);
    // 如果能找到对应的lock record说明偏向的线程还在执行同步代码块中的代码
    if (mon_info->owner() == obj) {
      ...
      // 需要升级为轻量级锁,直接修改偏向线程栈中的lock record。为了处理锁重入的case,在这里将lock record的displaced mark word设置为null,第一个lock record会在下面的代码中再处理
      markoop mark = markoopdesc::encode((basiclock*) null);
      highest_lock = mon_info->lock();
      highest_lock->set_displaced_header(mark);
    } else {
      ...
    }
  }
  if (highest_lock != null) {
    // 修改第一个lock record为无锁状态,然后将obj的mark word设置为指向该lock record的指针
    highest_lock->set_displaced_header(unbiased_prototype);
    obj->release_set_mark(markoopdesc::encode(highest_lock));
    ...
  } else {
    ...
  }

 

我们看slow_enter的流程。

void objectsynchronizer::slow_enter(handle obj, basiclock* lock, traps) {
  markoop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  // 如果是无锁状态
  if (mark->is_neutral()) {
    //设置displaced mark word并替换对象头的mark word
    lock->set_displaced_header(mark);
    if (mark == (markoop) atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      tevent (slow_enter: release stacklock) ;
      return ;
    }
  } else
  if (mark->has_locker() && thread->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (basiclock*)obj->mark(), "don't relock with same basiclock");
    // 如果是重入,则设置displaced mark word为null
    lock->set_displaced_header(null);
    return;
  }

  ...
  // 走到这一步说明已经是存在多个线程竞争锁了 需要膨胀为重量级锁
  lock->set_displaced_header(markoopdesc::unused_mark());
  objectsynchronizer::inflate(thread, obj())->enter(thread);
}

 

轻量级锁释放流程

case(_monitorexit): {
  oop lockee = stack_object(-1);
  check_null(lockee);
  // derefing's lockee ought to provoke implicit null check
  // find our monitor slot
  basicobjectlock* limit = istate->monitor_base();
  basicobjectlock* most_recent = (basicobjectlock*) istate->stack_base();
  // 从低往高遍历栈的lock record
  while (most_recent != limit ) {
    // 如果lock record关联的是该锁对象
    if ((most_recent)->obj() == lockee) {
      basiclock* lock = most_recent->lock();
      markoop header = lock->displaced_header();
      // 释放lock record
      most_recent->set_obj(null);
      // 如果是偏向模式,仅仅释放lock record就好了。否则要走轻量级锁or重量级锁的释放流程
      if (!lockee->mark()->has_bias_pattern()) {
        bool call_vm = useheavymonitors;
        // header!=null说明不是重入,则需要将displaced mark word cas到对象头的mark word
        if (header != null || call_vm) {
          if (call_vm || atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
            // cas失败或者是重量级锁则会走到这里,先将obj还原,然后调用monitorexit方法
            most_recent->set_obj(lockee);
            call_vm(interpreterruntime::monitorexit(thread, most_recent), handle_exception);
          }
        }
      }
      //执行下一条命令
      update_pc_and_tos_and_continue(1, -1);
    }
    //处理下一条lock record
    most_recent++;
  }
  // need to throw illegal monitor state exception
  call_vm(interpreterruntime::throw_illegal_monitor_state_exception(thread), handle_exception);
  shouldnotreachhere();
}

 

轻量级锁释放时需要将displaced mark word替换到对象头的mark word中。如果cas失败或者是重量级锁则进入到interpreterruntime::monitorexit方法中。

//%note monitor_1
irt_entry_no_async(void, interpreterruntime::monitorexit(javathread* thread, basicobjectlock* elem))
 
  handle h_obj(thread, elem->obj());
  ...
  objectsynchronizer::slow_exit(h_obj(), elem->lock(), thread);
  // free entry. this must be done here, since a pending exception might be installed on
  //释放lock record
  elem->set_obj(null);
  ...
irt_end

 

monitorexit调用完slow_exit方法后,就释放lock record

void objectsynchronizer::slow_exit(oop object, basiclock* lock, traps) {
  fast_exit (object, lock, thread) ;
}
void objectsynchronizer::fast_exit(oop object, basiclock* lock, traps) {
  ...
  markoop dhw = lock->displaced_header();
  markoop mark ;
  if (dhw == null) {
     // 重入锁,什么也不做
        ...
     return ;
  }

  mark = object->mark() ;

  // 如果是mark word==displaced mark word即轻量级锁,cas替换对象头的mark word
  if (mark == (markoop) lock) {
     assert (dhw->is_neutral(), "invariant") ;
     if ((markoop) atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
        tevent (fast_exit: release stacklock) ;
        return;
     }
  }
  //走到这里说明是重量级锁或者解锁时发生了竞争,膨胀后调用重量级锁的exit方法。
  objectsynchronizer::inflate(thread, object)->exit (true, thread) ;
}

 

该方法中先判断是不是轻量级锁,如果是轻量级锁则将替换mark word,否则膨胀为重量级锁并调用exit方法,相关逻辑将在重量级锁的文章中讲解。

 

本人免费整理了java高级资料,涵盖了java、redis、mongodb、mysql、zookeeper、spring cloud、dubbo高并发分布式等教程,一共30g,需要自己领取。
传送门:https://mp.weixin.qq.com/s/jzddfh-7ynudmkjt0irl8q