欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

浅谈Java 并发的底层实现

程序员文章站 2023-12-05 17:12:22
并发编程的目的是让程序运行更快,但是使用并发并不定会使得程序运行更快,只有当程序的并发数量达到一定的量级的时候才能体现并发编程的优势。所以谈并发编程在高并发量的时候才有意义...

并发编程的目的是让程序运行更快,但是使用并发并不定会使得程序运行更快,只有当程序的并发数量达到一定的量级的时候才能体现并发编程的优势。所以谈并发编程在高并发量的时候才有意义。虽然目前还没有开发过高并发量的程序,但是学习并发是为了更好理解一些分布式架构。那么当程序的并发量不高,比如是单线程的程序,单线程的执行效率反而比多线程更高。这又是为什么呢?熟悉操作系统的应该知道,cpu是通过给每个线程分配时间片的方式实现多线程的。这样,当cpu从一个任务切换到另一个任务的时候,会保存上一个任务的状态,当执行完这个任务的时候cpu就会继续上一个任务的状态继续执行。这个过程称为上下文切换。

在java多线程中,volatile关键字个synchronized关键字扮演了重要的角色,它们都可以实现线程的同步,但是在底层是如何实现的呢?

volatile

volatile只能保证变量对各个线程的可见性,但不能保证原子性。关于 java语言 volatile 的使用方法就不多说了,我的建议是 除了 配合package java.util.concurrent.atomic 中的类库,其他情况一概别用。更多的解释 参见 这篇文章。

引子

参见如下代码

 package org.go;

public class go {

  volatile int i = 0;

  private void inc() {
    i++;
  }

  public static void main(string[] args) {
    go go = new go();
    for (int i = 0; i < 10; i++) {
      new thread(() -> {
        for (int j = 0; j < 1000; j++)
          go.inc();
      }).start();
    }
    while(thread.activecount()>1){
      thread.yield();
    }
    system.out.println(go.i);
  }
}
 

每次执行上述代码结果都不同,输出的数字总是小于10000.这是因为在进行inc()的时候,i++并不是原子操作。或许有些人会提议说用 synchronized 来同步inc() , 或者 用 package java.util.concurrent.locks 下的锁去控制线程同步。但它们都不如下面的解决方案:

 package org.go;

import java.util.concurrent.atomic.atomicinteger;

public class go {

  atomicinteger i = new atomicinteger(0);

  private void inc() {
    i.getandincrement();
  }

  public static void main(string[] args) {
    go go = new go();
    for (int i = 0; i < 10; i++) {
      new thread(() -> {
        for (int j = 0; j < 1000; j++)
          go.inc();
      }).start();
    }
    while(thread.activecount()>1){
      thread.yield();
    }
    system.out.println(go.i);
  }
}

这时,如果你不了解 atomic 的实现,你一定会不屑的怀疑 说不定 atomicinteger 底层就是使用锁来实现的所以也未必高效。那么究竟是什么,我们来看看。

原子类的内部实现

无论是atomicinteger 或者是 concurrentlinkedqueue的节点类concurrentlinkedqueue.node,他们都有个静态变量
 private static final sun.misc.unsafe unsafe;,这个类是实现原子语义的c++对象sun::misc::unsafe的java封装。想看看底层实现,正好我手边有gcc4.8的源代码,对照本地路径,很方便找到github的路径,看这里。

以接口 getandincrement()的实现举例

atomicinteger.java

 private static final unsafe unsafe = unsafe.getunsafe();
public final int getandincrement() {
    for (;;) {
      int current = get();
      int next = current + 1;
      if (compareandset(current, next))
        return current;
    }
  }

public final boolean compareandset(int expect, int update) {
      return unsafe.compareandswapint(this, valueoffset, expect, update);
    } 

留意这个for循环,只有在compareandset成功时才会返回。否则就一直compareandset。

调用了compareandset实现。此处,我注意到 oracle jdk的实现是略有不同的,如果你查看jdk下的src,你可以看到oracle jdk是调用的unsafe的getandincrement(),但我相信oracle jdk实现unsafe.java的时候应该也是只调用compareandset,因为一个compareandset就可以实现增加、减少、设值的原子操作了。

unsafe.java

 public native boolean compareandswapint(object obj, long offset,
                     int expect, int update); 

通过jni调用的c++的实现。

natunsafe.cc

 jboolean
sun::misc::unsafe::compareandswapint (jobject obj, jlong offset,
           jint expect, jint update)
{
 jint *addr = (jint *)((char *)obj + offset);
 return compareandswap (addr, expect, update);
}

static inline bool
compareandswap (volatile jint *addr, jint old, jint new_val)
{
 jboolean result = false;
 spinlock lock;
 if ((result = (*addr == old)))
  *addr = new_val;
 return result;
} 

unsafe::compareandswapint调用 static 函数 compareandswap。而compareandswap又使用spinlock作为锁。这里的spinlock有lockguard的意味,构造时加锁,析构时释放。

我们需要聚焦在spinlock里。这里是保证spinlock释放之前都是原子操作的真正实现。

什么是spinlock

spinlock,即自旋锁,一种循环等待(busy waiting)以获取资源的锁。不同于mutex的阻塞当前线程、释放cpu资源以等待需求的资源,spinlock不会进入挂起、等待条件满足、重新竞争cpu的过程。这意味着只有在 等待锁的代价小于线程执行上下文切换的代价时,spinlock才优于mutex。

natunsafe.cc

 class spinlock
{
 static volatile obj_addr_t lock;

public:

spinlock ()
 {
  while (! compare_and_swap (&lock, 0, 1))
   _jv_threadyield ();
 }
 ~spinlock ()
 {
  release_set (&lock, 0);
 }
}; 

以一个静态变量 static volatile obj_addr_t lock; 作为标志位,通过c++ raii实现一个guard,所以所谓的锁其实是 静态成员变量obj_addr_t lock,c++中volatile 并不能保证同步,保证同步的是构造函数里调用的 compare_and_swap和一个static变量lock.这个lock变量是1的时候,就需要等;是0的时候,就通过原子操作把它置为1,表示自己获得了锁。

这里会用一个static变量实在是一个意外,如此相当于所有的无锁结构都共用同一个变量(实际就是size_t)来区分是否加锁。当这个变量置为1时,其他用到spinlock的都需要等。 为什么不在sun::misc::unsafe添加一个私有变量 volatile obj_addr_t lock;,并作为构造参数传给spinlock?这样相当于每个unsafe共享一个标志位,效果会不会好一些?

_jv_threadyield在下面的文件里,通过系统调用sched_yield(man 2 sched_yield)让出cpu资源。宏have_sched_yield在configure里定义,意味着编译时如果取消定义,spinlock就称为真正意义的自旋锁了。

posix-threads.h

 inline void
_jv_threadyield (void)
{
#ifdef have_sched_yield
 sched_yield ();
#endif /* have_sched_yield */
}

这个lock.h在不同平台有着不同的实现,我们以ia64(intel amd x64)平台举例,其他的实现可以在 这里 看到。

ia64/locks.h
 

typedef size_t obj_addr_t;
inline static bool
compare_and_swap(volatile obj_addr_t *addr,
             obj_addr_t old,
             obj_addr_t new_val)
{
 return __sync_bool_compare_and_swap (addr, old, new_val);
}

inline static void
release_set(volatile obj_addr_t *addr, obj_addr_t new_val)
{
 __asm__ __volatile__("" : : : "memory");
 *(addr) = new_val;
} 

__sync_bool_compare_and_swap 是gcc内建函数,汇编指令"memory"完成内存屏障。

  1. 一般地,如果cpu硬件支持指令 cmpxchg (该指令从硬件保障原子性,毫无疑问十分高效),那么__sync_bool_compare_and_swap就应该是用cmpxchg来实现的。
  2. 不支持cmpxchg的cpu架构 可以用lock指令前缀,通过锁cpu总线的方式实现。
  3. 如果连lock指令都不支持,有可能通过apic实现

总之,硬件上保证多核cpu同步,而unsafe的实现也是尽可能的高效。gcc-java的还算高效,相信oracle 和 openjdk不会更差。

原子操作 和 gcc内建的原子操作

原子操作

java的表达式以及c++的表达式,都不是原子操作,也就是说 你在代码里:

 //假设i是线程间共享的变量
i++; 

在多线程环境下,i的访问是非原子性的,实际分成如下三个操作数:

  1. 从缓存取到寄存器
  2. 在寄存器加1
  3. 存入缓存

编译器会改变执行的时序,因此执行结果可能并非所期望的。

gcc内建的原子操作

gcc内建了如下的原子操作,这些原子操作从4.1.2被加入。而之前,他们是使用内联的汇编实现的。

 type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)

type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

__sync_synchronize (...)

type __sync_lock_test_and_set (type *ptr, type value, ...)

void __sync_lock_release (type *ptr, ...) 

需要注意的是:

  1.  __sync_fetch_and_add 和 __sync_add_and_fetch 的关系 对应于 i++ 和 ++i。其他类推
  2.  cas的两种实现,bool版本的 如果对比oldval与ptr成功并给ptr设值newval 返回true;另一个 返回 原本*ptr的值
  3.  __sync_synchronize 添加一个完全的内存屏障

openjdk 的相关文件

下面列出一些github上 openjdk9的原子操作实现,希望能帮助需要了解的人。毕竟openjdk比gcc的实现应用更广泛一些。————但终究没有oracle jdk的源码,虽然据说openjdk与 oracle的源码差距很小。

atomicinteger.java

unsafe.java::compareandexchangeobject

unsafe.cpp::unsafe_compareandexchangeobject

oop.inline.hpp::oopdesc::atomic_compare_exchange_oop

atomic_linux_x86.hpp::atomic::cmpxchg

 inline jlong  atomic::cmpxchg  (jlong  exchange_value, volatile jlong*  dest, jlong  compare_value, cmpxchg_memory_order order) {
 bool mp = os::is_mp();
 __asm__ __volatile__ (lock_if_mp(%4) "cmpxchgq %1,(%3)"
            : "=a" (exchange_value)
            : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
            : "cc", "memory");
 return exchange_value;
} 

这里需要给不熟悉c/c++的java程序员提示一下,嵌入汇编指令的格式如下

 __asm__ [__volatile__](assembly template//汇编模板
    : [output operand list]//输入列表
    : [input operand list]//输出列表
    : [clobber list])//破坏列表 

汇编模板中的%1,%3,%4对应于后面的参数列表{"r" (exchange_value),"r" (dest),"r" (mp)},参数列表以逗号分隔,从0排序。输出参数放第一个冒号右边,输出参数放第二个冒号右边。"r"表示放到通用寄存器,"a"表示寄存器eax,有"="表示用于输出(写还)。cmpxchg指令隐含使用eax寄存器即参数%2.

其他细节就不在此罗列了,gcc的实现是把要交换的指针传下来,对比成功后直接赋值(赋值非原子),原子性通过spinlock保证。 

openjdk的实现是把要交换的指针传下来,直接通过汇编指令cmpxchgq赋值,原子性通过汇编指令保证。当然gcc的spinlock底层也是通过cmpxchgq保证的。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。