欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程之Java8如何优化CAS性能

程序员文章站 2022-03-08 18:59:22
...
举个例子,比如说10个线程分别对data执行一次data++操作,我们以为最后data的值会变成10,其实不是。因为多线程并发操作下,就是会有这种安全问题,导致数据结果不准确。
public class Test {

   private int data=0;
    //多个线程对一个data不停的累加1操作
}

这里解释下为啥会得不到 10(知道的可直接跳过), i++ 这个操作,计算机需要分成三步来执行。

1、读取 i 的值。
2、把 i 加 1.
3、把 最终 i 的结果写入内存之中。

所以,(1)、假如线程 A 读取了 i 的值为 i = 0,(2)、这个时候线程 B 也读取了 i 的值 i = 0。(3)、接着 A把 i 加 1,然后写入内存,此时 i = 1。(4)、紧接着,B也把 i 加 1,此时线程B中的 i = 1,然后线程 B 把 i 写入内存,此时内存中的 i = 1。也就是说,线程 A, B 都对 i 进行了自增,但最终的结果却是 1,不是 2.

解决方案一:使用synchronized

public class Test {

   private int data=0;

   public synchronized void increment(){
       data++;
    }
    //多个线程调用increment()
}

加了synchronized,代码就是线程安全的了,也就是让每个线程要进入increment()方法之前先得尝试加锁,同一时间只有一个线程能加锁,其他线程需要等待锁,jdk确实做了很多优化,增加了偏向锁、轻量级锁,当线程增多线程竞争带来的开销依然很大,性能不高。

解决方案二:Atomic原子类及其底层原理

java并发包下面提供了一系列的Atomic原子类,比如说AtomicInteger。

public class Test {
    private AtomicInteger data=new AtomicInteger(0);
    //多个线程调用data.incrementAndGet()
}

incrementAndGet底层源码:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

CAS(Compare and Set)即先比较再设置

compareAndSwapInt对应操作系统的一条硬件操作指令,尽管看似有很多操作在里面,但操作系统能够保证他是原子执行的。

 

虽然这种 CAS 的机制能够保证线程安全,但依然有一些问题。例如,当线程A即将要执行第三步的时候,线程 B 把 i 的值加1,之后又马上把 i 的值减 1,然后,线程 A 执行第三步,这个时候线程 A 是认为并没有人修改过 i 的值,因为 i 的值并没有发生改变。而这,就是我们平常说的ABA问题

对于基本类型的值来说,这种把数字改变了在改回原来的值是没有太大影响的,但如果是对于引用类型的话,就会产生很大的影响了。

为了解决这个 ABA 的问题,我们可以引入版本控制,例如,每次有线程修改了引用的值,就会进行版本的更新,虽然两个线程持有相同的引用,但他们的版本不同,这样,我们就可以预防 ABA 问题了。Java 中提供了 AtomicStampedReference 这个类,就可以进行版本控制了。

java8对CAS的优化

大量的线程同时并发修改一个AtomicInteger,可能有很多线程会不停的自旋,进入一个无限重复的循环中,这些线程不停地获取值,然后发起CAS操作,但是发现这个值被别人改过了,于是再次进入下一个循环,获取值,发起CAS操作又失败了,再次进入下一个循环。

java8提供了高并发下非常优秀两个类LongAdder与DoubleAdder,以LongAdder为例,他就是尝试使用分段CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性能!

public class LongAdder extends Striped64 implements Serializable

LongAdder继承了Striped64类,来实现累加功能的,它是实现高并发累加的工具类;
Striped64的设计核心思路就是通过内部的分散计算来避免竞争。
Striped64内部包含一个base和一个Cell[] cells数组,又叫hash表。
没有竞争的情况下,要累加的数通过cas累加到base上;如果有竞争的话,会将要累加的数累加到Cells数组中的某个cell元素里面。所以整个Striped64的值为sum=base+∑[0~n]cells。

1、Striped64内部三个重要的成员变量:

/** 
 * 存放Cell的hash表,大小为2的幂。 
 */  
transient volatile Cell[] cells;  
/** 
 * 基础值,
 * 1. 在没有竞争时会更新这个值;
 * 2. 在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base。 
 */  
transient volatile long base;  
/** 
 * 自旋锁,通过CAS操作加锁,用于保护创建或者扩展Cell表。 
 */  
transient volatile int cellsBusy;  

(1)成员变量cells
AtomicInteger只有一个value,所有线程累加都要通过cas竞争value这一个变量,高并发下线程争用非常严重;
而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,它为null,这时使用base做累加,有了竞争后cells数组就上场了,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器cpu的数量为止就不在扩容(想下为什么到超过cpu数量的时候就不再扩容);每个线程会通过线程对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上。

(2)成员变量cellsBusy

cellsBusy,它有两个值0 或1,它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁,加锁的状况有三种
1)cells数组初始化的时候;
2)cells数组扩容的时候;
3)如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

(3)成员变量base

它有两个作用:
1)在开始没有竞争的情况下,将累加值累加到base
2)在cells初始化的过程中,cells不可用,这时会尝试将值累加到base上;

2、Cell内部类

 //为提高性能,使用注解@sun.misc.Contended,用来避免伪共享,
 @sun.misc.Contended static final class Cell {
        //用来保存要累加的值
        volatile long value;
        Cell(long x) { value = x; }
        //使用UNSAFE类的cas来更新value值
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }
        private static final sun.misc.Unsafe UNSAFE;
        //value在Cell类中存储位置的偏移量;
        private static final long valueOffset;
        //这个静态方法用于获取偏移量
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

这个类很简单,final类型,内部有一个value值,使用cas来更新它的值;Cell类唯一需要注意的地方就是Cell类的注解@sun.misc.Contended。

伪共享

说清楚Contended需要理解伪共享,cpu的缓存系统中是以缓存行(cache line)为单位存储的,缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节,cache line是cache和memory之间数据传输的最小单元。

当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

并发编程之Java8如何优化CAS性能

在core1上运行的线程想更新变量X,同时core2上的线程想要更新变量Y。不幸的是,这两个变量在同一个缓存行中。每个线程都要去竞争缓存行的所有权来更新变量。如果core1获得了所有权,缓存子系统将会使core2中对应的缓存行失效。当core2获得了所有权然后执行更新操作,core1就要使自己对应的缓存行失效。这会来来回回的经过L3缓存,大大影响了性能。如果互相竞争的核心位于不同的插槽,就要额外横跨插槽连接,问题可能更加严重。

举个例子:

public final static class VolatileLong {
    public volatile long value = 0L;
}

定义一个VolatileLong类型的数组,然后让多个线程同时并发访问这个数组,这时可以想到,在多个线程同时处理数据时,数组中的多个VolatileLong对象可能存在同一个缓存行中,通过上文可知,这种情况就是伪共享。

 

为了解决这个问题在jdk1.6会采用long padding的方式,就是在防止被伪共享的变量的前后加上7个long类型的变量

public final static class VolatileLong {
    volatile long p0, p1, p2, p3, p4, p5, p6;
    public volatile long value = 0;
    volatile long q0, q1, q2, q3, q4, q5, q6;
}

为什么这里需要加7个long?原因是缓存行大小是64个字节,一个long是8字节,为了填充一个缓存行,value自己是8字节再加上7个long即64字节

Java内存布局(Java Memory Layout)

对于HotSpot JVM,所有对象都有两个字长的对象头。第一个字是由24位哈希码和8位标志位(如锁的状态或作为锁对象)组成的Mark Word。第二个字是对象所属类的引用。如果是数组对象还需要一个额外的字来存储数组的长度。每个对象的起始地址都对齐于8字节以提高性能。因此当封装对象的时候为了高效率,对象字段声明的顺序会被重排序成下列基于字节大小的顺序:

  1. doubles (8) 和 longs (8)
  2. ints (4) 和 floats (4)
  3. shorts (2) 和 chars (2)
  4. booleans (1) 和 bytes (1)
  5. references (4/8)
  6. <子类字段重复上述顺序>

在jdk1.8中加入了@sun.misc.Contended,移除了不优雅的long padding

注意:使用@sun.misc.Contended时,需要添加参数配置-XX:-RestrictContended,否则该注解不起作用。

 

并发编程之Java8如何优化CAS性能

在LongAdder的底层实现中,首先有一个base值,刚开始多线程来不停的累加数值,都是对base进行累加的,比如刚开始累加成了base = 5时候发现线程增多(也就是casBase操作失败),那么它会自动使用cell数组,每一个线程对应于一个cell,在每一个线程中对该cell进行cas操作,这样就可以提高并发效率,分散并发压力。在LongAdder中的源码如下: 

    public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;
        /**
         * 如果一下两种条件则继续执行if内的语句
         * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组)
         * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内;
         * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值
         * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否#不#存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。

            boolean uncontended = true;
            /**
            *1. as == null : cells数组未被初始化,成立则直接进入if执行cell初始化
            *2. (m = as.length - 1) < 0: cells数组的长度为0
            *条件1与2都代表cells数组没有被初始化成功,初始化成功的cells数组长度为2;
            *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的长度不为0,则通过getProbe方法获取当前线程Thread的threadLocalRandomProbe变量的值,初始为0,然后执行threadLocalRandomProbe&(cells.length-1 ),相当于m%cells.length;如果cells[threadLocalRandomProbe%cells.length]的位置为null,这说明这个位置从来没有线程做过累加,需要进入if继续执行,在这个位置创建一个新的Cell对象;
            *4. !(uncontended = a.cas(v = a.value, v + x)):尝试对cells[threadLocalRandomProbe%cells.length]位置的Cell对象中的value值做累加操作,并返回操作结果,如果失败了则进入if,重新计算一个threadLocalRandomProbe;

            如果进入if语句执行longAccumulate方法,有三种情况
            1. 前两个条件代表cells没有初始化,
            2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作,
            3. 第四个条件代表产生了冲突,uncontended=false
            **/
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

 

当线程增多,每个cell中分配的线程数也会增多,当其中一个线程操作失败的时候,它会自动迁移到下一个cell中进行操作,这也就解决了CAS空旋转,自旋不停等待的问题。这就是自动迁移机制
具体在源码中的体现如下(该段代码在抽象类Striped64下,LongAdder继承自该类):

 

三个参数第一个为要累加的值,第二个为null,第三个为wasUncontended表示调用方法之前的add方法是否未发生竞争;final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成;
        //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况
        //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败;
        //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败;
       //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0;
        int h;
        if ((h = getProbe()) == 0) {
            //初始化ThreadLocalRandom;
            ThreadLocalRandom.current(); // force initialization
            //将h设置为0x9e3779b9
            h = getProbe();
            //设置未竞争标记为true
            wasUncontended = true;
        }
        //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突 
        boolean collide = false; 
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //这个主干if有三个分支
            //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4)
            //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2)
            //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上
            //先看主分支一
            if ((as = cells) != null && (n = as.length) > 0) {
                /**
                 *内部小分支一:这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争,可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m%cells.length]位置上 
                 */
                if ((a = as[(n - 1) & h]) == null) {
                    //cellsBusy == 0 代表当前没有线程cells数组做修改
                    if (cellsBusy == 0) {
                        //将要累加的x值作为初始值创建一个新的Cell对象,
                        Cell r = new Cell(x); 
                        //如果cellsBusy=0无锁,则通过cas将cellsBusy设置为1加锁
                        if (cellsBusy == 0 && casCellsBusy()) {
                            //标记Cell是否创建成功并放入到cells数组被hash的位置上
                            boolean created = false;
                            try {
                                Cell[] rs; int m, j;
                                //再次检查cells数组不为null,且长度不为空,且hash到的位置的Cell为null
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //将新的cell设置到该位置
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                //去掉锁
                                cellsBusy = 0;
                            }
                            //生成成功,跳出循环
                            if (created)
                                break;
                            //如果created为false,说明上面指定的cells数组的位置cells[m%cells.length]已经有其它线程设置了cell了,继续执行循环。
                            continue;
                        }
                    }
                   //如果执行的当前行,代表cellsBusy=1,有线程正在更改cells数组,代表产生了冲突,将collide设置为false
                    collide = false;

                /**
                 *内部小分支二:如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环;
                 */
                } else if (!wasUncontended)  
                    //设置未竞争标志位true,继续执行,后面会算一个新的probe值,然后重新执行循环。 
                    wasUncontended = true;
                /**
                *内部小分支三:新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出  
                */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                /**
                 *内部小分支四:分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值
                else if (n >= NCPU || cells != as)
                    collide = false;
                /**
                 *内部小分支五:如果发生了冲突collide=false,则设置其为true;会在最后重新计算hash值后,进入下一次for循环
                 */
                else if (!collide)
                    //设置冲突标志,表示发生了冲突,需要再次生成hash,重试。 如果下次重试任然走到了改分支此时collide=true,!collide条件不成立,则走后一个分支
                    collide = true;
                /**
                 *内部小分支六:扩容cells数组,新参与cell争用的线程两次均失败,且符合库容条件,会执行该分支
                 */
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        //检查cells是否已经被扩容
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //为当前线程重新计算hash值
                h = advanceProbe(h);

            //这个大的分支处理add方法中的条件1与条件2成立的情况,如果cell表还未初始化或者长度为0,先尝试获取cellsBusy锁。
            }else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    //初始化cells数组,初始容量为2,并将x值通过hash&1,放到0个或第1个位置上
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    //解锁
                    cellsBusy = 0;
                }
                //如果init为true说明初始化成功,跳出循环
                if (init)
                    break;
            }
            /**
             *如果以上操作都失败了,则尝试将值累加到base上;
             */
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
(1)关于hash的生成

hash是LongAdder定位当前线程应该将值累加到cells数组哪个位置上的,所以hash的算法是非常重要的,下面就来看看它的实现。
java的Thread类里面有一个成员变量

  @sun.misc.Contended("tlr")
    int threadLocalRandomProbe;

threadLocalRandomProbe这个变量的值就是LongAdder用来hash定位Cells数组位置的,平时线程的这个变量一般用不到,它的值一直都是0。

在LongAdder的父类Striped64里通过getProbe方法获取当前线程threadLocalRandomProbe的值:
    static final int getProbe() {
        //PROBE是threadLocalRandomProbe变量在Thread类里面的偏移量,所以下面语句获取的就是threadLocalRandomProbe的值;
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

(2)threadLocalRandomProbe的初始化

线程对LongAdder的累加操作,在没有进入longAccumulate方法前,threadLocalRandomProbe一直都是0,当发生争用后才会进入longAccumulate方法中,进入该方法第一件事就是判断threadLocalRandomProbe是否为0,如果为0,则将其设置为0x9e3779b9

 int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            //设置未竞争标记为true
            wasUncontended = true;
        }

重点在这行ThreadLocalRandom.current();

    public static ThreadLocalRandom current() {
        if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
            localInit();
        return instance;
    }

在current方法中判断如果probe的值为0,则执行locaInit()方法,将当前线程的probe设置为非0的值,该方法实现如下:

  static final void localInit() {
        //private static final AtomicInteger probeGenerator =
        new AtomicInteger();
        //private static final int PROBE_INCREMENT = 0x9e3779b9;
        int p = probeGenerator.addAndGet(PROBE_INCREMENT);
        //prob不能为0
        int probe = (p == 0) ? 1 : p; // skip 0
        long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
        //获取当前线程
        Thread t = Thread.currentThread();
        UNSAFE.putLong(t, SEED, seed);
        //将probe的值更新为probeGenerator的值
        UNSAFE.putInt(t, PROBE, probe);
    }

probeGenerator 是static 类型的AtomicInteger类,每执行一次localInit()方法,都会将probeGenerator 累加一次0x9e3779b9这个值;,0x9e3779b9这个数字的得来是 2^32 除以一个常数,这个常数就是传说中的黄金比例 1.6180339887;然后将当前线程的threadLocalRandomProbe设置为probeGenerator 的值,如果probeGenerator 为0,这取1;
(3)threadLocalRandomProbe重新生成

就是将prob的值左右移位 、异或操作三次

    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }


 

当我们需要获取多线程更新后的值的时候,只需要将base和cell数组中的值加起来返回即可

/**
 * Returns the current sum.  The returned value is <em>NOT</em> an
 * atomic snapshot; invocation in the absence of concurrent
 * updates returns an accurate result, but concurrent updates that
 * occur while the sum is being calculated might not be
 * incorporated.
 *
 * @return the sum
 */
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

 

相关标签: 并发编程