欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  移动技术

JUC—两万字的atomic原子类源码深度解析

程序员文章站 2022-04-15 19:21:37
基于JDK1.8详细介绍了JUC下面的atomic子包中的大部分原子类的底层源码实现,比如AtomicInteger、AtomicIntegerArray、AtomicStampedReference等原子类源码。最后还介绍了JDK1.8对原子类的增强,比如LongAdder和LongAccumulator的原理!...

  基于JDK1.8详细介绍了JUC下面的atomic子包中的大部分原子类的底层源码实现,比如AtomicInteger、AtomicIntegerArray、AtomicStampedReference等原子类源码。最后还介绍了JDK1.8对原子类的增强,比如LongAdder和LongAccumulator的原理!

1 atomic的概述

  JDK1.5之前,为了保证Java中对单个变量的多个独立操作的原子性和安全性,通常会使用到synchronized锁,但是synchronized需要底层操作系统mutex资源的支持,这是一种重量级资源,性能比较低!
  JDK1.5的时候,新增了JUC包,增加了许多和同步有关的特性,大大提高了使用Java进行并发编程的效率,比如并发集合、并发队列、新lock锁等。另外,JUC包下面还提供了一个java.util.concurrent.atomic子包,这个atomic包中的类用于在多线程环境下实现单个变量多个独立操作(比如读-写)的连续原子性,并且都比较高效,因为它们都是由基于偏移量(类似于指针)的非阻塞CAS算法实现,用于替代锁的使用。
  JDK1.8的atomic包中具有17个原子类,根据支持的更新变量的类型,我们可以对常用原子类分为三种,分别是原子更新单个变量、原子更新数组、原子更新引用属性(字段)。
  atomic 包下的常用原子类如下:

摘要
AtomicBoolean 用原子方式更新的 boolean 值。
AtomicInteger 用原子方式更新的 int 值。
AtomicLong 用原子方式更新的 long 值。
AtomicReference< V > 用原子方式更新的对象引用。
AtomicMarkableReference< V > 维护带有boolean标志位的对象引用,可以原子方式对其进行更新。
AtomicStampedReference< V > 维护带有int整数版本号的对象引用,可用原子方式对其进行更新。
AtomicIntegerArray 用原子方式更新其元素的 int 数组。
AtomicLongArray 用原子方式更新其元素的 long 数组。
AtomicReferenceArray< E > 用原子方式更新其元素的对象引用数组。
AtomicIntegerFieldUpdater< T > 基于反射的实用工具,可以对指定类的指定非私有非静态的 volatile int 字段进行原子更新。
AtomicLongFieldUpdater< T > 基于反射的实用工具,可以对指定类的指定非私有非静态的 volatile long 字段进行原子更新。
AtomicReferenceFieldUpdater< T,V > 基于反射的实用工具,可以对指定类的指定非私有非静态的 volatile 引用字段进行原子更新 。
LongAdder JDK1.8新增加的原子类累加器,使用热点数据分离的思想对long数据进行加法运算,性能更佳!
LongAccumulator JDK1.8新增加的原子类累加器,使用热点数据分离的思想对long数据进行指定规则的运算,性能更佳!
DoubleAdder JDK1.8新增加的原子类累加器,使用热点数据分离的思想对double数据进行加法运算,性能更佳!
DoubleAccumulator JDK1.8新增加的原子类累加器,使用热点数据分离的思想对double数据进行指定规则的运算,性能更佳!

  实际上Java中atomic包下的原子类的基石就是:volatile字段修饰符+CAS算法(Unsafe提供)。本文没有对这两个基本知识点做深入讲解,因为前面的文章中已经讲了,都是深入到了虚拟机源码级别,如果想要深入了解原子类的原理,应该要看看以下文章:Java中的volatile实现原理深度解析以及应用Java中的CAS实现原理深度解析与应用案例

2 原子更新单个变量

2.1 基本原子类

  通过原子的方式更新单个变量,Atomic包提供了以下4个基础类:

  1. AtomicBoolean:用原子方式更新的 boolean 值。
  2. AtomicInteger:用原子方式更新的 int 值。
  3. AtomicLong:用原子方式更新的 long 值。
  4. AtomicReference< V >:用原子方式更新的对象引用。

  上面四个原子类的原理几乎一致,我们以AtomicInteger来讲解。

2.1.1 重要属性

  AtomicInteger 中保存了一个核心字段value,它就代表了Atomiclnteger 的当前实际取值,所有的方法都是围绕该值进行的。
  还有一个属性valueOffset,它保存着value 字段在Atomiclnteger 对象中的偏移量。Unsafe中的CAS方法都是通过字段的偏移量来操作字段的。

/**
 * 内部的value属性,它就代表了Atomiclnteger 的当前实际取值。
 * 所有的方法都是围绕该值进行的
 */
private volatile int value;

/**
 * 使用给定值初始化value
 *
 * @param initialValue 给定值
 */
public AtomicInteger(int initialValue) {
    value = initialValue;
}

/**
 * 初始化value值为0
 */
public AtomicInteger() {
}

/**
 * 内部实际上依赖于Unsafe类的方法,堆value值进行操作
 */
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
 * value字段的偏移量
 */
private static final long valueOffset;

static {
    try {
        //初始化value字段的偏移量
        valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) {
        throw new Error(ex);
    }
}

2.1.2 重要方法

/**
 * 获取当前最新值
 *
 * @return 当前最新值
 */
public final int get() {
    return value;
}

/**
 * 设置给定新值
 *
 * @param newValue 新值
 */
public final void set(int newValue) {
    value = newValue;
}

/**
 * 原子性的将当前值设为给定新值,返回旧值
 *
 * @param newValue 新值
 * @return 旧值
 */
public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}


/**
 * 如果当前值等于预期值,则以原子方式将该值设置为给定的新值
 *
 * @param expect 预期值
 * @param update the new value
 * @return true 更新成功 false 更新失败
 */
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
 * 原子性的将当前值加1,返回旧值
 *
 * @return 旧值
 */
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}


/**
 * 原子性的将当前值减1,返回旧值
 *
 * @return 返回旧值
 */
public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}


/**
 * 原子性的将当前值增加delta,返回旧值
 *
 * @param delta 增加的值
 * @return 旧值
 */
public final int getAndAdd(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta);
}


/**
 * 原子性的将当前值加1,返回新值
 *
 * @return 更新后的值
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}


/**
 * 原子性的将当前值减1,返回新值
 *
 * @return 更新后的值
 */
public final int decrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}


/**
 * 原子性的将当前值增加delta,返回新值
 *
 * @param delta 增加的值
 * @return 更新后的值
 */
public final int addAndGet(int delta) {
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}


/**
 1. 最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
 2. 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
 3.  4. @param newValue 新值
 */
public final void lazySet(int newValue) {
    unsafe.putOrderedInt(this, valueOffset, newValue);
}

  可以看到,里面的方法都是调用的Unsafe类方法,进行的CAS操作。
  Atomic包实际上只提供了3种基本类型的原子更新:int、long、boolean,其中boolean也是转换为int的0、1进行更新的,实际上并没有char、float和double等的CAS操作,实际上char、 float、double都可以转换为int或者long在进行操作,如果DoubleAdder就是采用Double.doubleToRawLongBits将double转换为long类型的值在进行操作。

/*Unsafe只提供了3种CAS方法.*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

/*AtomicBoolean源码中,它是先把Boolean转换成int类型,再使用compareAndSwapInt进行CAS操作*/
public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

2.2 带版本号的原子类

  通过原子的方式更新单个变量的原子类的升级版,Atomic包提供了以下2个类:

  1. AtomicMarkableReference< V >:维护带有标记位的对象引用,可以原子方式对其进行更新。
  2. AtomicStampedReference< V >:维护带有整数标志的对象引用,可用原子方式对其进行更新。

  上面两个原子类的方法以及原理几乎一致,属于带有版本号的原子类。我们知道CAS操作的三大问题之一就是“ABA”问题:
  CAS需要再操作值的时候,检查值有没有发生变化,如果没有发生变化则更新。但是一个值,如果原来为A,变成了B,又变成了A,那么使用CAS进行compare and set的时候,会发现它的值根本没变化过,但实际上是变化过的。
  ABA问题的解决思路就是使用版本号,1A->2B->3A,在Atomic包中,提供了一个现成的AtomicStampedReference类来解决ABA问题,使用的就是添加版本号的方法。还有一个AtomicMarkableReference实现类,它比AtomicStampedReference更加简单,AtomicStampedReference中每更新一次数据版本号也会更新一次,这样可以使用版本号统计到底更新了多少次,而AtomicMarkableReference仅仅使用了一个boolean值来表示值是否改变过,因此使用的比较少。
  这里我们以AtomicStampedReference来讲解。

2.2.1 重要属性

  AtomicStampedReference内部不仅维护了我们的传递的对象reference,还维护了一个int类型的版本号stamp,它们都被存放到一个Pair类型的内部类实例中。当AtomicStampedReference 对应的数据被修改时,除了更新数据本身外,还必须要更新版本号,这个版本号一般都是自增的。当AtomicStampedReference 设置对象值时,对象值及版本号都必须满足期望值,才会更新成功。

/**
 * Pair内部类,用于维护reference和stamp
 *
 * @param <T>
 */
private static class Pair<T> {
    /**
     * 真正的数据
     */
    final T reference;
    /**
     * 版本号
     */
    final int stamp;

    private Pair(T reference, int stamp) {
        this.reference = reference;
        this.stamp = stamp;
    }

    /**
     * 返回Pair实例
     */
    static <T> Pair<T> of(T reference, int stamp) {
        return new Pair<T>(reference, stamp);
    }
}

/**
 * 由于要维护两个属性,因此干脆使用一个内部类对象来维护这两个属性
 */
private volatile Pair<V> pair;

/**
 * 创建具有给定初始值的新 AtomicStampedReference。
 *
 * @param initialRef   初始值
 * @param initialStamp 初始版本号
 */
public AtomicStampedReference(V initialRef, int initialStamp) {
    //初始化一个Pair对象,并初始化属性值
    pair = Pair.of(initialRef, initialStamp);
}

2.2.2 重要方法

  最重要的就是compareAndSet方法,它需要传递:期望值、新值、期望版本号、新版本号,当期望值和期望版本号都与此时内部的真实值和真实版本号相等的时候,就会调用compareAndSwapObject使用一个新的Pair对象替换旧的Pair对象,同时完成reference和stamp的更新。

/**
 * 如果当前引用 == 预期引用,并且当前版本号等于预期版本号,则以原子方式将该引用和该标志的值设置为给定的更新值。
 *
 * @param expectedReference 预期引用
 * @param newReference      新引用
 * @param expectedStamp     预期版本号
 * @param newStamp          新版本号
 * @return 如果成功,则返回 true
 */
public boolean compareAndSet(V expectedReference,
                             V newReference,
                             int expectedStamp,
                             int newStamp) {
    Pair<V> current = pair;
    //一系列的判断,如果两个预期值都相等,那么尝试调用compareAndSwapObject使用新的Pair对象替代旧的Pair对象
    //这样就同时完成了reference和stamp的更新
    return
            expectedReference == current.reference &&
                    expectedStamp == current.stamp &&
                    ((newReference == current.reference &&
                            newStamp == current.stamp) ||
                            casPair(current, Pair.of(newReference, newStamp)));
}

/**
 * CAS替换内部的Pair对象的方法
 *
 * @param cmp 预期pair对象
 * @param val 新pair对象
 * @return 如果成功,则返回 true
 */
private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}


/**
 * @return 获得当前保存的对象引用
 */
public V getReference() {
    return pair.reference;
}

/**
 * @return 获得当前保存的版本号
 */
public int getStamp() {
    return pair.stamp;
}


/**
 * 设置新对象引用和版本号
 *
 * @param newReference 新对象引用
 * @param newStamp     新版本号
 */
public void set(V newReference, int newStamp) {
    Pair<V> current = pair;
    //如果新对象引用以及新版本号和之前的都一样那就不设置
    //否则就是新建一个Pair对象并设置相应的属性,替代原来的Pair对象
    if (newReference != current.reference || newStamp != current.stamp)
        this.pair = Pair.of(newReference, newStamp);
}

2.2.3 案例

  实际上,如果更新的数据是无状态的数据,那么使用基本的原子类也可以完成目的,即如果线程A将值从1->2->1,而线程B仅仅是使用了值,这是没什么问题的,但是如果和业务相关联,比较的对象是有状态的,那么可能会出现严重问题。
  比如还是线程A将值从1->2->1,而线程B的业务逻辑是如果发现数据改变过,那么就不能操作,这样的话就不能单纯的比较值了,这就需要用到版本号了。

/**
 * @author lx
 */
public class AtomicStampedReferenceDemo {

    public static void main(String args[]) {
        //初始值为0,版本号为0
        AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(0, 0);

        Thread thread = new Thread(() -> {
            //先获取标志位
            int timestamp = atomicStampedReference.getStamp();
            //获取原值
            int reference = atomicStampedReference.getReference();
            System.out.println("原值reference: " + reference);
            //阻塞,等待被唤醒
            LockSupport.park();
            if (atomicStampedReference.compareAndSet(reference, reference + 1, timestamp, timestamp + 1)) {
                System.out.println("更新成功,新值reference: " + atomicStampedReference.getReference());
            } else {
                System.out.println("更新失败,新值reference: " + atomicStampedReference.getReference());
                System.out.println("虽然原值和新值相等,但是是在线程阻塞过程中值发生了变化,变化了" + atomicStampedReference.getStamp() + "次");
            }
        });
        thread.start();


        Thread thread1 = new Thread(() -> {
            //对数据先加一再减一,反复4次,最终reference的值是不变的
            for (int i = 0; i < 4; i++) {
                int timestamp = atomicStampedReference.getStamp();
                int reference = atomicStampedReference.getReference();
                if (i % 2 == 0) {
                    atomicStampedReference.compareAndSet(reference, reference + 1, timestamp, timestamp + 1);
                } else {
                    atomicStampedReference.compareAndSet(reference, reference - 1, timestamp, timestamp + 1);
                }
            }
            //唤醒阻塞的thread线程
            LockSupport.unpark(thread);
        });
        thread1.start();
    }
}

  同样的逻辑,使用普通原子类就能更新成功:

/**
 1. @author lx
 */
public class AtomicRefrenceDemo {

    public static void main(String args[]) {
        //初始值为0
        AtomicReference<Integer> atomicReference = new AtomicReference<Integer>(0);

        Thread thread = new Thread(() -> {
            int reference = atomicReference.get();
            System.out.println("原值reference: " + reference);
            //阻塞,等待被唤醒
            LockSupport.park();
            if (atomicReference.compareAndSet(reference, reference + 1)) {
                System.out.println("更新成功,新值reference: " + atomicReference.get());
            } else {
                System.out.println("更新失败,新值reference: " + atomicReference.get());
            }
        });
        thread.start();


        Thread thread1 = new Thread(() -> {
            //对数据先加一再减一,反复4次,最终的值是不变的
            for (int i = 0; i < 4; i++) {
                int reference = atomicReference.get();
                if (i % 2 == 0) {
                    atomicReference.compareAndSet(reference, reference + 1);
                } else {
                    atomicReference.compareAndSet(reference, reference - 1);
                }
            }
            //唤醒阻塞的thread线程
            LockSupport.unpark(thread);
        });
        thread1.start();
    }
}

3 原子更新数组

  通过原子的方式更新数组里的某个元素,Atomic包提供了以下3个类:

  1. AtomicIntegerArray:用原子方式更新其元素的 int 数组。
  2. AtomicLongArray:用原子方式更新其元素的 long 数组。
  3. AtomicReferenceArray< E >:用原子方式更新其元素的对象引用数组。

  上面三个原子类的原理几乎一致,我们以AtomicIntegerArray来讲解。

3.1 重要属性

  可以看到内部就是一个int的数组,然后调用Unsafe的方法对数组的元素进行操作。

/**
 * 使用Unsafe操作数组
 */
private static final Unsafe unsafe = Unsafe.getUnsafe();
/**
 * 返回数组类型的第一个元素的偏移地址(基础偏移地址)。
 * 如果arrayIndexScale方法返回的比例因子不为0,你可以通过结合基础偏移地址和比例因子访问数组的所有元素。
 */
private static final int base = unsafe.arrayBaseOffset(int[].class);
/**
 * scale最高位的1的所在位数(从左从0开始),在计算某个索引的偏移量的时候
 * 使用是该值进行位运算而不是scale进行传统乘法运算,提升效率
 */
private static final int shift;
/**
 * 底层int数组
 */
private final int[] array;

static {
    //返回数组单个元素的大小,数组中的元素的地址是连续的,64位虚拟机应该是4
    int scale = unsafe.arrayIndexScale(int[].class);
    //大小必须是2的幂次方
    if ((scale & (scale - 1)) != 0)
        throw new Error("data type scale not a power of two");
    //numberOfLeadingZeros用于返回scale的最高非零位前面的0的个数,包括符号位在内;
    //31减去scale的最高非零位前面的0的个数,就表示scale最高位的1的所在位数,比如scale为2,那么shift为1,如果scale为4,那么shift为2
    shift = 31 - Integer.numberOfLeadingZeros(scale);
}

/**
 * 某个数组索引位置的元素的偏移量
 *
 * @param i 数组索引
 * @return 该索引的偏移量
 */
private long checkedByteOffset(int i) {
    if (i < 0 || i >= array.length)
        throw new IndexOutOfBoundsException("index " + i);
    return byteOffset(i);
}

/**
 * @param i 索引位置
 * @return 返回某个数组索引位置的元素的偏移量
 */
private static long byteOffset(int i) {
    //这里就能明白shift的作用了,对于2的幂次方的scale:
    //这里可以使用scale的最高为1的位置shift的位运算i << shift,代替scale*i的传统运算,效率提高
    //比如scale=4,那么shift=2,如果i=3,那么i<<shift = 3 << 2 = 12 就等于 scale*i = 4 * 3  = 12
    //比如scale=8,那么shift=3,如果i=3,那么i<<shift = 3 << 3 = 24 就等于 scale*i = 8 * 3  = 24
    return ((long) i << shift) + base;
}


/**
 * 创建给定长度的新 AtomicIntegerArray。
 *
 * @param length 给定长度
 */
public AtomicIntegerArray(int length) {
    array = new int[length];
}

/**
 * 创建与给定数组具有相同长度的新 AtomicIntegerArray,并从给定数组复制其所有元素。
 *
 * @param array 给定数组
 * @throws NullPointerException 如果数组为 null
 */
public AtomicIntegerArray(int[] array) {
    // 克隆数组,元素浅克隆
    this.array = array.clone();
}

3.2 重要方法

  其常用方法如下,基于Unsafe的volatile和CAS操作:

/**
 * 获取i索引位置的当前值
 *
 * @param i 多赢
 * @return 当前值
 */
public final int get(int i) {
    return getRaw(checkedByteOffset(i));
}

private int getRaw(long offset) {
    //volatile的获取最新值
    return unsafe.getIntVolatile(array, offset);
}

/**
 * 在i索引位置设定为指定新值
 *
 * @param i        索引
 * @param newValue 新值
 */
public final void set(int i, int newValue) {
    //volatile的写
    unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}


/**
 * 以原子方式将元素设置在i索引位置,并返回旧值
 *
 * @param i        索引
 * @param newValue 新值
 * @return 旧值
 */
public final int getAndSet(int i, int newValue) {
    return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

/**
 * 以原子方式将输入值与数组中索引i的元素相加,并返回旧值
 *
 * @param i     索引
 * @param delta 相加的数据
 * @return 旧值
 */
public final int getAndAdd(int i, int delta) {
    return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

/**
 * 以原子方式将输入值与数组中索引i的元素相加,并返回新值
 *
 * @param i     索引
 * @param delta 相加的数据
 * @return 更新后的值
 */
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}


/**
 1. 如果当前值等于预期值,则以原子方式将数组位置i的元素设置成新值。
 2.  3. @param i      索引
 3. @param expect 预期值
 4. @param update 新值
 5. @return true表示CAS成功 false 表示CAS失败
 */
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

4 原子更新字段属性

  通过原子的方式更新对象里的某个字段,Atomic包提供了以下3个类:

  1. AtomicIntegerFieldUpdater< T >:基于反射的实用工具,可以对指定类的指定非私有的 volatile int
    字段进行原子更新。
  2. AtomicLongFieldUpdater< T >:基于反射的实用工具,可以对指定类的指定非私有的 volatile long
    字段进行原子更新。
  3. AtomicReferenceFieldUpdater< T,V >:基于反射的实用工具,可以对指定类的指定非私有的 volatile
    引用字段进行原子更新。

  以上3个类的原理几乎一样,我们以AtomicIntegerFieldUpdater来讲解。
  AtomicIntegerFieldUpdater实际上是一个抽象类,它的实现类实际上在它的内部而且是私有的,因此只能使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性字符串名。
  另外,这里对于对象的字段的设置是先采用getDeclaredField方法反射获取的对应字段的Filed对象,然后在对Filed对象进行操作,并且没有设置setAccessible权限,因此类的字段属性不能是私有属性!
  由于CAS 操作会通过对象实例中的偏移量直接进行赋值,即Unsafe. objectFieldOffset()方法。因此,它不支持对static属性的赋值。
  对象的字段还应该被设置为volatile类型,这样就能获取到最新的值。

/**
 1. @author lx
 */
public class AtomicFieldUpdaterTest {
    public static void main(String[] args) {
        AtomicIntegerFieldUpdater<User> old = AtomicIntegerFieldUpdater.newUpdater(User.class, "old");
        AtomicReferenceFieldUpdater<User, String> name = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");

        User user = new User("user", 10);

        System.out.println(old.getAndIncrement(user));
        System.out.println(old.get(user));

        System.out.println(name.getAndSet(user, "user2"));
        System.out.println(name.get(user));
    }

    public static class User {
        volatile String name;
        volatile int old;

        User(String name, int old) {
            this.name = name;
            this.old = old;
        }

        public String getName() {
            return name;
        }

        public int getOld() {
            return old;
        }
    }
}

5 原子类的加强

  JDK1.8的时候,新增了四个原子类:

  1. LongAdder:long类型的数值累加器,从0开始累加,累加规则为加法运算。
  2. LongAccumulator:long类型的数值累加器,可从指定值开始累加,可指定累加规则。
  3. DoubleAdder:double类型的数值累加器,从0开始累加,累加规则为加法运算。
  4. DoubleAccumulator:double类型的数值累加器,可从指定值开始累加,可指定累加规则。

  自从原子类问世之后,多线程环境下如果用于统计计数操作,一般可以使用AtomicLong来代替锁作为计数器,AtomicLong 通过CAS 提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说它的性能己经很好了,那么,它们有什么缺点吗?
  实际上,AtomicLong等其他传统的atomic原子类对于数值的更改,通常都是在一个无限循环(自旋)中不断尝试CAS 的修改操作,一旦CAS失败则循环重试,这样来保证最终CAS操作成功。如果竞争不激烈,那么修改成功的概率就很高,但是如果在高并发下大量线程频繁的竞争修改计数器,会造成一次CAS修改失败的概率就很高。在大量修改失败时,这些原子操作就会进行多次循环尝试,白白浪费CPU 资源,因此性能还是会受到影响。
  JDK1.8新增这些类,正是为了解决高并发环境下由于频繁读写AtomicLong等计数器而可能造成某些线程持续的空转(循环)进而浪费CPU的情况,它们也被称为“累加器”!
  LongAdder和DoubleAdder,LongAccumulator和DoubleAccumulator的原理差不多。实际上DoubleAdder中对于double的累加也是先通过Double.doubleToRawLongBits将double类型转换为long类型来进行计算的,并且底层也是存储的long类型的值,在获取总和的时候又会通过Double.longBitsToDouble将存储的long值转换为double。
  下面我们将对LongAdder和DoubleAdder进行讲解!

5.1 LongAdder

5.1.1 LongAdder的概述

public class LongAdder
  extends Number
  implements Serializable

  来自于JDK 1.8的LongAdder,作为一个long类型数值的累加器,被用来克服在高并发下使用AtomicLong 可能由于线程频繁自旋而浪费CPU的缺点。
  LongAdder的解决方式是采用了“热点数据分离”的基本思想:
  传统的原子类的内部通常维护了一个对应类型的value属性值,多个线程之间的CAS竞争实际上就是在争夺对这个value属性的更新权,但是CAS操作只会保证同时只有一个线程能够更新成功,因此AtomicLong(包括其他传统原子类)的性能瓶颈就是由于过多线程同时去竞争一个变量的更新而产生的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源,最终的结果就是统计被分解出来的多个变量的总和,这样就能大大缓解多线程竞争导致的性能问题,这就是“热点数据分离”的基本思想。这种思想在高并发环境下非常有用,类似的还有“减小锁的粒度”的思想,除了新的原子类之外,在JDK1.8的ConcurrentHashMap中对于结点数量的统计并没有采用单个变量计数,也是采用的类似于LongAdder的“热点数据分离”的基本思想。

JUC—两万字的atomic原子类源码深度解析

5.1.2 LongAdder的原理

5.1.2.1 内部结构

  下面是Striped64中的常用属性,LongAdder实际上就是使用的这些属性, 没什么自己的特别的属性:

//Striped64中的属性

/**
 * 用来实现CAS锁的资源,值为0时表示没有锁,值为1时表示已上锁,扩容Cell 数组或者初始化Cell 数组时会使用到该值
 * 使用CAS的同时唯一成功性来保证同一时刻只有一条线程可以进入扩容Cell 数组或者初始化Cell 数组的代码
 */
transient volatile int cellsBusy;

/**
 * volatile long 类型的基本属性,在没有CAS竞争时用来统计计数
 */
transient volatile long base;

/**
 * volatile Cell类型的数组,要么为null,当发生CAS更新base出现竞争的时候初始化
 * 此后就一直使用该数组来统计计数,初始容量为2,数组可扩容,大小为2的幂次方
 */
transient volatile Striped64.Cell[] cells;

/**
 1. ccells数组的元素类型,由于是数组,导致内存连续,因此可以使用缓存填充(注解方式)来避免伪共享。
 */
@sun.misc.Contended
static final class Cell {
    /**
     * 内部就是一个volatile long类型的基本属性,线程对数组某个索引位置的更新实际上就是更新该值
     */
    volatile long value;

    Cell(long x) {
        value = x;
    }

    /**
     * 更新value指的CAS方法
     *
     * @param cmp 预期值
     * @param val 新值
     * @return true 成功  false 失败
     */
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    //对于数值的更新都是CAS的操作

    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Striped64.Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

  LongAdder类继承自Striped64类,Striped64被设计用来减少高并发环境的线程冲突,Striped64类是对外不可见的,也是这四个累加器类的公共抽象父类,它们很多的操作也是直接调用Striped64的方法,在Striped64 内部维护着三个主要的变量:

  1. cellsBusy :用来实现简单的CAS锁,状态值只有0和l,当创建Cell 元素,扩容Cell 数组或者初始化Cell 数组时,使用CAS 操作该变量来保证同时只有一个线程可以进行其中之一的操作。
  2. base:volatile int类型的一个基本属性,热点分离的实现之一,在没有存在并发CAS操作的时候记录被用于记录累加值,也用来记录初始值。
  3. cells:volatile Cell[ ] 类型的一个对象数组,热点分离的实现之二,当使用CAS更新base基值失败(出现CAS竞争)的时候,就会初始化该数组,然后尝试通过更新该数组中的某个位置的值来记录累加值。

  由此我们可以明确的知道,LongAdder的热点分离思想的具体实现是将value分散为一个base变量+一个cells数组。
  这里采用数组的用途很明显,那就是对于并发下的线程随机分配到数组不同索引位置,并对该位置的值进行更新,因此理论上采用一个数组就行了,那么为什么不采用单独一个数组还要加一个变量呢?在没有竞争的情况下,如果还是初始化一个数组然后更新数组某个索引的值就有些得不偿失了,因为数组明显比单个变量占用更多的空间,其更新效率也没有单独更新一个变量那么块。
  因此,综合考虑下LongAdder采用一个变量base和一个数组cells一起来计数,它们的使用流程如下:在更新计数的时候如果没有CAS竞争,即并发度较低时就一直使用base变量来统计计数,此时cells数组是null,即没有初始化或者锁延迟初始化,就和AtomicLong一样。一旦出现对base变量的CAS竞争,即高并发环境下某些线程CAS更新base失败,那么就初始化cells数组,并且此后都使用cells数组来进行统计计数,如果数组某一个索引位置的Cell更新时仍然出现了竞争,那么cells数组可能会扩容或者寻找新的Cell。在统计总和时对base和cells数组中的值进行求和即可,这种方法在热点分离的基础上还优化了内存的开销。
  初始化cells数组中的容量为2,扩容时必须保证容量为2的幂次方,数组里面的数据是Cell 类型,Cell类中仅仅只有一个value属性,实际上就是对value值的封装,封装成为类的原因主要是方便调用方法对某个位置的value值进行CAS的更新,以及作缓存填充操作。
  因为数组的内存空间必须是连续的,而一个cell内部只有一个int value属性,非常有可能多个cell对象存在同一个缓存行中,当CAS的更新某一个Cell的值时会将该Cell所属的缓存行失效,因此会同时造成其他位于同一个缓存行的相邻Cell缓存也同时失效,这样后续线程必须重主存获取相邻的Cell,这就造成了“伪共享”的问题,两个Cell的访问应该是互不影响的,但是由于在同一个缓存行,造成了和“共享”的现象,因此称为“伪共享”。这里的Cell 类使用了 @sun.misc.Contended注解修饰,这是JDK1.8缓存填充的新方式,这样一个Cell对象就占据一个缓存行的大小,解决了伪共享的问题,进一步提升了性能。 关于伪共享,可以看这篇文章:Java中的伪共享深度解析以及避免方法
  LongAdder仅有一个空构造器:

/**
 * 空的构造器,Cell数组延迟初始化
 */
public LongAdder() {
}

5.1.2.2 add增加给定值

public void add(long x)

  add方法用于增加给定值。这个方法是LongAdder的核心方法之一。代码比较多,且比较难以理解。
  大概步骤为:

  1. as变量保存此时的cells数组。判断当as为null时,那么CAS更新base的值,如果更新成功,那么add方法就结束了,这就是采用base属性更新的逻辑。
  2. 如果as不为null,或者CAS更新base失败之后,都会进入if代码块,内部就是采用cells数组更新的逻辑:
    1. uncontended变量表示冲突的标记,初始化true。
    2. if代码块中又是一个if判断,通过||连接四个表达式:
      1. 如果as为null,说明cells数组没有初始化。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断;
      2. m等于cells数组长度减一。如果m小于0,说明数组没初始化完毕。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断。
      3. getProbe()用于获取当前线程的threadLocalRandomProbe值,是一个随机生成的探测哈希值,不同的线程不一样,初始值为0。通过getProbe() & m 计算当前线程应该访问数组的某个索引元素并赋值给a。另外,threadLocalRandomProbe也被用在ThreadLocalRandom中。如果a为null,说明该索引位置还没有初始化元素对象。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断。
      4. 最后调用a.cas方法尝试CAS的将base值从v = a.value更新为v+x,即,使用该位置记录增加值,CAS的结果赋值给uncontended,如果还是CAS更新失败,即说明这个位置还是有冲突。如果条件满足,继续那么进入if代码块,如果该条件不满足,表示CAS成功,使用数组该位置的CAS记录更新成功,那么add方法结束。
      5. 即如果上面的四个表达式有一个返回true,那么就是进入if代码块,表示cells需要(正在)初始化、或者某个位置的Cell需要初始化,或者cells的竞争激烈需要扩容。在if代码块中调用longAccumulate方法,该方法是Striped64的方法,用于进一步处理上面的问题并且最终会新增给定值成功,add方法结束。传递参数为:x、null、uncontended。实际上即使进入了longAccumulate方法,还是有可能最终会使用base属性进行更新的,那就是多个线程判断cells为null并同时进入的情况下,后面会讲到。
/**
 * 增加给定值
 *
 * @param x 给定值
 */
public void add(long x) {
    //初始化一些变量
    Striped64.Cell[] as;
    long b, v;
    int m;
    Striped64.Cell a;
    /*
     * as保存此时的cells数组:
     * 如果不为null,那么直接进入if代码块
     * 如果为null,说明合格数组还没有初始化,执行后面的casBase操作,b保存base的值
     * 随后尝试CAS的将base值从b更新为b + x,即使用base记录增加值,如果CAS成功那么不进入if代码块,add方法就结束了
     * CAS失败同样进入if代码块,表示CAS更新bae的值出现了并发竞争
     *
     * 即,当cells数组为null并且CAS更新base的值成功之后,add方法就结束了,这就是采用base更新的逻辑
     * 如果cells不为null,或者CAS更新base失败之后,都会进入if代码块,下面就是采用数组更新的逻辑
     */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        //uncontended用于表示是否没有进行CAS操作,初始化true,当CAS失败的时候会变成false
        boolean uncontended = true;
        /*
         * as == null
         *      如果as为null,说明cells数组没有初始化。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断
         * (m = as.length - 1) < 0
         *      m等于cells数组长度减一,如果m小于0,说明数组没初始化完毕。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断
         * (a = as[getProbe() & m]) == null
         *      getProbe()用于获取当前线程的threadLocalRandomProbe值,是一个随机生成的探测哈希值,不同的线程不一样,初始值为0
         *      通过getProbe() & m 计算当前线程应该访问数组的某个索引元素并赋值给a。另外,threadLocalRandomProbe也被用在ThreadLocalRandom中
         *      如果a为null,说明该索引位置还没有初始化元素对象。如果条件满足,继续那么进入if代码块,如果该条件不满足,继续向后判断
         * !(uncontended = a.cas(v = a.value, v + x))
         *      最后调用a.cas方法尝试CAS的将base值从v = a.value更新为v+x,即使用该位置记录增加值,CAS的结果赋值给uncontended,如果还是CAS更新失败,即说明这个位置还是有冲突。
         *      如果条件满足,继续那么进入if代码块,如果该条件不满足,表示CAS成功,使用数组该位置的CAS记录更新成功,那么add方法结束
         *
         * 即如果上面的四个表达式有一个返回true,那么就是进入if代码块,表示cells需要/正在初始化、或者某个位置的Cell需要初始化,或者cells的竞争激烈需要扩容
         */
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
            //调用longAccumulate方法进一步处理,该方法是Striped64的方法,传递x、null、uncontended
            longAccumulate(x, null, );
    }
}

/**
 * 尝试CAS的将base值从cmp更新为val
 *
 * @param cmp 预期base值
 * @param val 新base值
 * @return 成功返回true;失败返回false
 */
final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

/**
 1. 获取当前线程的探测哈希值,不同的线程不一样,初始值为0
 */
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
5.1.2.2.1 longAccumulate统一处理

  在上面的add方法中,如果遇到base竞争时,会使用cells记录更新值,在使用cells的的时候可能会遇到这些情况:

  1. 因为cells数组是在需要它的时候才会初始化的,可能cells数组还未初始化,此时需要初始化cells数组;
  2. 计算出来的cells数组某个索引位置的Cell对象还未初始化,此时需要初始化该位置的Cell对象;
  3. 在CAS的更新某个Cell对象的时候,又发生了冲突,即多个线程定位到了同一个Cell,此时可能需要对cells数组扩容。

  面对这些情况,就需要调用longAccumulate方法进行统一的处理并最终更新给定值成功,该方法是Striped64的方法,LongAccumulator类中也是调用该方法。在LongAdder中调用该方法时,传递的参数为x、null、uncontended。
  大概步骤就是:

  1. 调用getProbe获取此线程最新的probe赋值给h,如果为0,说明probe还没有初始化,即该线程第一次进入这个方法,那么进行如下操作:
    1. 调用ThreadLocalRandom.current()初始化线程的probe属性;
    2. 重新获取此时线程最新的probe赋值给h;
    3. 对于线程第一次进入的情况,如果是CAS失败的原因肯定是因为probe没有初始化才造成的CAS竞争数组0索引结点失败(0&m=0),此时wasUncontended为false。因此在probe初始化之后,将wasUncontended统一设置为true,表示“不存在CAS竞争”,因为觉得线程初始化之后通过这个probe找到的新索引位置是大概率不会CAS失败的。
  2. 初始化一个是否可能进行扩容的标志collide,如果为true,表示则下一次循环可能会进行扩容,如果为false,表示下一次循环一定不会进行扩容;
  3. 开启一个死循环,相当于自旋,尝试处理相应的情况并且并增加给定值。但是这里的自旋的效率相比于元素针对单个vlaue变量的效率高得多:
    1. as保存此时的cells数组,如果as不为null,说明cells数组已经初始化,并且n保存此时的as数组的长度,如果n大于0,说明数组初始化完毕:
      1. 通过(n - 1) & h定位到当前线程对应的某个Cell并使用a变量保存,这个定位方式就是类似于hash函数。
      2. 如果a为null,说明当前位置还没有线程使用过,那么尝试在该位置新增一个Cell,对于数组某个位置新增Cell的操作需要保证线程安全:
        1. 如果cellsBusy为0,表示当前没有其他线程在创建或扩容cells并且也没有创建Cell,即没有获取CAS锁:
          1. 新建一个Cell对象r,内部初始值就是x。
          2. 如果此时cellsBusy还是为0,那么当前线程调用casCellsBusy方法尝试将cellsBusy从0改为1,表示获取CAS锁。如果CAS成功那么可以进入下面的if代码块,用于进行对上面新创建的Cell进行赋值。这样就可以控制同一时刻只能有一个线程进入此if代码块,这里相当于借助cellsBusy和CAS操作实现了一个CAS锁,cellsBusy为0表示无锁,cellsBusy为1表示有线程加锁。
            1. created表示新建的Cell是否放入cells数组对应位置成功的标记,初始化为false,表示未成功。
            2. 重新获取此时最新的数组并计算该线程在数组中的对应的索引位置j,并判断如果该位置还是为null,那么将上面新建的Cell对象r,放入数组对应的j索引位置,created改为true,表示放入cells数组对应位置成功。
            3. 无论上面有没有成功,最终会将cellsBusy置为0,表示释放CAS锁。为什么获取CAS所致后还需要校验一次呢?因为可能在获取上面获取as之后加锁成功之前,底层的cells数组被其他线程改变了,比如被扩容,比如该位置被其他线程抢先存入了Cell对象或者扩容后计算出的位置本来就有Cell对象等,因此在加锁之后再一次检查是很有必要的。
            4. 最后判断created如果为true,表示放入cells数组对应位置成功,新增值x就是该位置新增元素的默认值,表示增加给定值成功,那么break跳出这个死循环,longAccumulate方法结束。
            5. 到这一步,即created为false,表示放入cells数组对应位置失败,对应的索引位置非null,那么continue结束本次循环,本次竞争到了锁说明竞争不是很激烈,下一次循环中有很大概率CAS成功,也不需要后续调用advanceProbe计算新的probe值了。
          3. 到这里,表示cellsBusy为1,即有线程正在创建cells,或者在扩容cells,或者在创建Cell;或者竞争CAS锁失败,collide置为false,下一次循环一定不会扩容,而是继续尝试。由于此时并发竞争可能会比较严重,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell。
      3. 否则,a不为null,表示cells计算出来的索引位置已经存在Cell。如果wasUncontended为false。表示此线程的probe在前面已经被初始化并且是因为add方法中的a.cas调用失败才进来该方法的:
        1. wasUncontended重置为true,下一次循环中就可能会匹配到后面的条件。随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell。
      4. 否则,表示wasUncontended为true,可能是:上面的条件满足随后wasUncontended置为true并进行的第二次自旋,或者由于该线程getProbe()) == 0而在初始化probe之后将wasUncontended置为true。此时重新调用a.cas,尝试CAS的更新位置的Cell的值,如果CAS成功,那么表示增加给定值成功,break跳出循环,longAccumulate方法结束;
      5. 否则,表示上面的CAS更新失败,还是发生了CAS冲突。判断如果as数组长度n大于等于CPU的实际可用线程数量NCPU,表示达到了数组最大容量,不能够再扩容了,此后只能循环CAS直到成功;或者如果此时的cells不等于as了,表示数组被扩容了,那么同样需要重新尝试CAS操作新数组。
        1. 上面的条件满足一个,都会将collide设置为false,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell。
      6. 否则,表示as数组长度n小于CPU的实际可用线程数量NCPU,并且此时的cells等于as了,表示没有数组没有扩容此时可以进行数组的扩容,但这里仅仅判断!collide是否为true,即collide是否为false:
        1. 如果collide为false,那么将collide设置为true。随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell。即,在真正尝试扩容之前,还需要再至少自旋一次,寄希望能够CAS成功,尽量避免扩容操作!
      7. 否则,表示collide为true。到这里,说明并发很严重,表示真正的可以扩容了。同样首先需要通过cellsBusy获取CAS锁,如果失败,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell,如果成功获取CAS锁:
        1. 同样需要校验该数组是否是最新的数组,因为可能在加锁之前此数组被其他线程扩容了。如果是同一个数组,那么可以扩容:新建Cell数组,容量为原容量的两倍,循环旧数组,将数据转移到新数组对应的索引位置,将新数组赋给cells,到这里,表示扩容成功。
        2. 无论上面有没有扩容成功,最终会将cellsBusy置为0,表示释放CAS锁。
        3. 到这里,表示扩容成功,或者扩容失败此时是因为其他线程了扩容了数组,因此无论如何下一次循环都会使用新数组,collide重置为false。
        4. 由于扩容了数组,但是还没有增加给定值,因此还需要继续循环,但是由于后面循环使用更大的数组,因此将会有更大的概率CAS成功,直接continue结束本次循环,也不需要后续调用advanceProbe新的probe值了。
      8. 在该轮循环中数组已经初始化的情况下,如果没有在后续的代码中break跳出循环或则continue结束本次循环的操作,那么都将会调用advanceProbe重新计算当前线程的probe值,下一次循环时就有很大概率得到另外一个的Cell位置,可以减少下次访问cells元素时的冲突概率。
    2. 否则,表示数组没有初始化完毕,那么这里尝试进行数组的初始化,初始化操作同一时刻只能有一个线程进入,这里需要获取CAS锁。如果此时cellsBusy为0,那么表示当前没有其他线程在创建、扩容cells并且也没有创建Cell,但是有可能是其他线程已经初始化数组完毕了,并且如果此时cells 还是等于 as,说明数组确实没有被初始化,并且如果调用casCellsBusy方法尝试将cellsBusy从0改为1成功,那么表示获取了CAS锁,可以进入下面的代码块,用于进行数组的初始化:
      1. init表示新建数组成功的标记,初始化为false,表示未成功;
      2. 这里同样需要校验,因为可能在加锁之前此cells被其他线程初始化了,尽管这样的概率很低,如果通过,新建Cell数组,初始容量为2,通过 h&1 计算当前线程在该数组的位置,如果h的二进制数的最低位为1那么计算结果就是1索引,否为计算结果就是0索引,随后新建一个Cell对象初始化值就是给定值x,并放入计算出的数组对应的索引位置,将新数组rs赋给cells,到这里,表示初始化成功,同时增加给定值成功,init设置为true
      3. 无论上面有没有成功,最终会将cellsBusy置为0,表示释放CAS锁。
      4. 最后判断init如果为true,表示cells数组初始化同时增加给定值成功,那么break跳出这个死循环,longAccumulate方法结束。
      5. 到这一步还没有结束,表示cells数组初始化同时增加给定值失败,因为数组已被其他线程初始化了,随后会进行下一次循环;
    3. 否则,上面两个条件都不满足,表示其他线程正在初始化数组,或者数组已被其他线程初始化完毕,或者CAS竞争锁失败,那么还是尝试使用base变量来增加给定值,如果成功那么break跳出这个死循环,longAccumulate方法结束。可以看到,即使进入了longAccumulate方法,还有有可能会使用base更新的,那就是多个线程判断cells为null并同时进入的情况下。
    4. 到这里,表示casBase失败,那么继续下一次循环。

可以看到,源码还是比较复杂的,如果实在搞不懂的同学只需要记住我们前面讲的大概流程就行了!

/**
 * CPU中通常一个内核一个线程,后来有了超线程技术,可以把一个物理核心,模拟成两个逻辑核心,线程量增加一倍
 * 因此这里获取的是CPU的实际可用线程数量,比如 i7-8750H 它具有6核心12线程,因此获取的就是12而不是6
 * 通常CPU的实际可用线程数量越高,运行并发的程序的效率也越高
 * <p>
 * CPU的实际可用线程数量NCPU在Striped64中被用来绑定cells数组的大小
 */
static final int NCPU = Runtime.getRuntime().availableProcessors();


/**
 * 处理cells初始化、调整容量、创建新Cell等情况,并增加给定值。
 *
 * @param x              给定值
 * @param fn             累加规则函数,如果是LongAdder就传递null,表示仅仅是加法运算,如果是LongAccumulator就可以传递指定累加规则
 * @param wasUncontended 如果在add方法的a.cas调用之前就调用该方法,那么为true,表示cells未初始化或者某个Cell未初始化;否则为false,表示a.cas竞争失败
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    //获取此线程最新的probe赋值给h,如果为0,说明probe还没有初始化,即该线程第一次进入这个方法
    if ((h = getProbe()) == 0) {
        //ThreadLocalRandom.current()方法本来是获取线程自己的随机数生成器,同时会初始化线程自己的probe和threadLocalRandomSeed属性
        //这里被借用来初始化probe属性
        ThreadLocalRandom.current(); // force initialization
        //重新获取此时线程最新的probe赋值给h
        h = getProbe();
        //对于线程第一次进入的情况,如果是CAS失败的原因肯定是因为probe没有初始化才造成的CAS竞争数组0索引结点失败(0&m=0),此时wasUncontended为false
        //因此在probe初始化之后,将wasUncontended统一设置为true,表示“不存在CAS竞争”,因为觉得线程初始化之后通过这个probe找到的新索引位置是大概率不会CAS失败的
        wasUncontended = true;
    }
    //是否可能进行扩容的标志,如果为true,表示则下一次循环可能会进行扩容,如果为false,表示下一次循环一定不会进行扩容
    boolean collide = false;                // True if last slot nonempty
    /*
     * 开启一个死循环,相当于自旋,尝试处理相应的情况并且并增加给定值
     * 但是这里的自旋的效率相比于元素针对单个vlaue变量的效率高得多
     */
    for (; ; ) {
        Cell[] as;
        Cell a;
        int n;
        long v;
        /*
         * as保存此时的cells,如果as不为null,说明cells数组已经初始化
         * 并且 n保存此时的as数组的长度,如果n大于0,说明数组初始化完毕
         */
        if ((as = cells) != null && (n = as.length) > 0) {
            /*
             * 通过(n - 1) & h定位到当前线程对应的某个Cell并使用a变量保存,这个定位方式就是类似于hash函数
             * 如果a为null,说明当前位置还没有线程使用过,那么尝试在该位置新增一个Cell,对于数组某个位置新增Cell的操作需要保证线程安全
             */
            if ((a = as[(n - 1) & h]) == null) {
                //如果cellsBusy为0,表示当前没有其他线程在创建或扩容cells并且也没有创建Cell
                //即没有获取CAS锁,此时需要新建一个Cell放到该位置
                if (cellsBusy == 0) {       // Try to attach new Cell
                    //新建一个Cell对象r,内部初始值就是x
                    Cell r = new Cell(x);   // Optimistically create
                    /*
                     * 如果此时cellsBusy还是为0,那么表示当前没有其他线程在创建或扩容cells并且也没有创建Cell
                     * 然后当前线程调用casCellsBusy方法尝试将cellsBusy从0改为1,表示获取CAS锁
                     * 如果CAS成功那么可以进入下面的if代码块,用于进行对上面新创建的Cell进行赋值
                     * 如果CAS失败,说明此时有线程正在操作cells数组,那么不会进if代码块
                     * 同一时刻只能有一个线程进入此if代码块,这里相当于借助cellsBusy和CAS操作实现了一个CAS锁
                     * cellsBusy为0表示无锁,cellsBusy为1表示有线程加锁
                     */
                    if (cellsBusy == 0 && casCellsBusy()) {
                        //新建的Cell是否放入cells数组对应位置成功的标记,初始化为false表示未成功
                        boolean created = false;
                        //放置Cell的操作
                        try {               // Recheck under lock
                            Cell[] rs;
                            int m, j;
                            /*
                             * 这里相当于再检查一遍,为什么要检查呢,如果可能在上面获取as之后加锁成功之前,底层的cells数组被其他线程改变了
                             * 比如被扩容,比如该位置被其他线程抢先存入了Cell对象或者扩容后计算出的位置本来就有Cell对象等,因此在加锁之后再一次检查是很有必要的
                             *
                             * rs保存此时的cells,如果rs不为null,说明cells数组已经初始化
                             * 并且 m保存此时的rs数组的长度,如果m大于0,说明数组初始化完毕
                             * 并且 j保存前线程定位到的Cell索引,该索引的元素为null,说明当前位置还没有线程使用过
                             *
                             * 这三个条件都满足,那么进入if代码块
                             * 如果3个条件有一个不满足,那么不进入if代码块,将会进行下一次循环
                             */
                            if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                //将上面新建的Cell对象r,放入数组对应的j索引位置
                                rs[j] = r;
                                //created改为true,表示放入cells数组对应位置成功
                                created = true;
                            }
                        } finally {
                            //无论上面有没有成功,最终会将cellsBusy置为0,表示释放CAS锁
                            cellsBusy = 0;
                        }
                        //最后判断created如果为true,表示放入cells数组对应位置成功,
                        //新增值x就是该位置新增元素的默认值,表示增加给定值成功,那么break跳出这个死循环,longAccumulate方法结束
                        if (created)
                            break;
                        //到这一步,即created为false,表示放入cells数组对应位置失败,对应的索引位置非null,那么continue结束本次循环,
                        //本次竞争到了锁说明竞争不是很激烈,下一次循环中有很大概率CAS成功,也不需要后续调用advanceProbe计算新的probe值了
                        continue;           // Slot is now non-empty
                    }
                }
                //到这里,表示cellsBusy为1,即有线程正在创建cells,或者在扩容cells,或者在创建Cell;或者竞争CAS锁失败
                //collide置为false,下一次循环一定不会扩容,而是继续尝试
                collide = false;
                //由于此时并发竞争可能会比较严重,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell
            }
            /*
             * 否则,a不为null,表示cells计算出来的索引位置已经存在Cell,如果wasUncontended为false
             * 表示此线程的probe在前面已经被初始化并且是因为add方法中的a.cas调用失败才进来该方法的
             */
            else if (!wasUncontended)       // CAS already known to fail
                //wasUncontended重置为true,下一次循环中就可能会匹配到后面的条件
                //随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell
                wasUncontended = true;      // Continue after rehash
                /*
                 * 否则,表示wasUncontended为true,可能是:
                 * 上面的!wasUncontended条件满足随后wasUncontended置为true并进行的第二次自旋
                 * 或者由于该线程getProbe()) == 0而在初始化probe之后将wasUncontended置为true
                 *
                 * 此时重新调用a.cas,尝试CAS的更新该位置的Cell的值,如果CAS成功,那么表示增加给定值成功,break跳出循环,longAccumulate方法结束
                 */
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                    fn.applyAsLong(v, x))))
                break;
                /*
                 * 否则,表示上面的CAS更新失败,说明还是发生了CAS冲突
                 * 判断如果as数组长度n大于等于CPU的实际可用线程数量NCPU,表示达到了数组最大容量,不能够再扩容了,此后只能循环CAS直到成功
                 * 或者如果此时的cells不等于as了,表示数组被扩容了,那么同样需要重新尝试CAS操作新数组
                 *
                 * 上面的条件满足一个,都会将collide设置为false,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell
                 */
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
                /*
                 * 否则,表示as数组长度n小于CPU的实际可用线程数量NCPU,并且此时的cells等于as了,表示没有数组没有扩容
                 * 此时可以进行数组的扩容,但这里仅仅判断如果!collide为true,即collide为false,那么将collide设置为true,
                 * 随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell
                 * 即,在真正尝试扩容之前,还需要再至少自旋一次,寄希望能够CAS成功,尽量避免扩容操作
                 *
                 *
                 * collide设置为true表示下一次循环可能进行扩容,但也不是一定的,下一次循环过程中,可能遇到:
                 *      数组被扩容了,计算的新位置没有Cell,此时该线程会存放Cell,并跳出循环
                 *      CAS成功,跳出循环
                 *      遇到n >= NCPU,即数组不能再扩容了,那么该线程的扩容操作就会被取消,collide = false,此后会一致在这几个操作中循环直到CAS成功
                 *      cells != as,即数组又被扩容了,那么还会进入下一次循环,collide = false,对新数组进行尝试
                 *
                 *      前面的都不满足,此时由于上一次循环中将collide设置为了true,因此这里的!collide不满足,终于将会进入下面的else if条件,尝试进行数组的扩容
                 */
            else if (!collide)
                collide = true;
                /*
                 * 否则,表示collide为true。到这里,说明并发很严重,表示真正的可以扩容了
                 * 同样首先需要通过cellsBusy获取CAS锁,如果失败,随后会调用advanceProbe重新计算probe,并进行下一次循环,期望下一次循环时得到不同位置的Cell
                 * 但是这里collide没有被重置为false
                 */
            else if (cellsBusy == 0 && casCellsBusy()) {
                //加锁成功之后
                try {
                    //同样需要校验该数组是否是最新的数组,因为可能在加锁之前此数组被其他线程扩容了
                    if (cells == as) {      // Expand table unless stale
                        //新建Cell数组,容量为原容量的两倍
                        Cell[] rs = new Cell[n << 1];
                        //循环旧数组,将数据转移到新数组对应的索引位置
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        //将新数组赋给cells,到这里,表示扩容成功
                        cells = rs;
                    }
                } finally {
                    //无论上面有没有成功,最终会将cellsBusy置为0,表示释放CAS锁
                    cellsBusy = 0;
                }
                //到这里,表示扩容成功,或者扩容失败此时是因为其他线程了扩容了数组,因此无论如何下一次循环都会使用新数组
                //collide重置为false
                collide = false;
                //由于扩容了数组,但是还没有增加给定值,因此还需要继续循环,但是由于后面循环使用更大的数组,因此将会有更大的概率CAS成功
                //直接continue结束本次循环,也不需要后续调用advanceProbe新的probe值了
                continue;                   // Retry with expanded table
            }

            /*
             * 该轮循环中数组已经初始化的情况下,如果没有在后续的代码中break跳出循环或则continue结束本次循环的操作,那么都将会重新计算当前线程的probe值,
             * 下一次循环时就有很大概率得到另外一个的Cell位置,可以减少下次访问cells元素时的冲突概率。
             */
            //使用xorshift算法生成随机数
            h = advanceProbe(h);
        }
        /*
         * 否则,表示数组没有初始化,那么这里尝试进行数组的初始化,初始化操作同一时刻只能有一个线程进入,这里需要获取CAS锁
         *
         * 如果 此时cellsBusy为0,那么表示当前没有其他线程在创建、扩容cells并且也没有创建Cell,但是有可能是其他线程已经初始化数组完毕了
         * 并且 如果此时cells 还是等于 as,说明数组确实没有被初始化
         * 并且 如果调用casCellsBusy方法尝试将cellsBusy从0改为1成功,那么可以进入下面的代码块,用于进行数组的初始化
         *
         * 如果上面的条件都满足,那么可以尝试进行数组的初始化,否则表示数组已经被初始化了,将会进入最后一个else if条件
         */
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            //新建数组成功的标记,初始化为false表示未成功
            boolean init = false;
            //初始化数组的操作
            try {                           // Initialize table
                //同样需要校验,因为可能在加锁之前此数组被其他线程初始化了,尽管这样的概率很低
                if (cells == as) {
                    //新建Cell数组,初始容量为2
                    Cell[] rs = new Cell[2];
                    //通过 h&1 计算当前线程在该数组的位置,如果h的二进制数的最低位为1那么计算结果就是1索引,否为计算结果就是0索引
                    //随后新建一个Cell对象初始化值就是给定值x,并放入计算出的数组对应的索引位置
                    rs[h & 1] = new Cell(x);
                    //将新数组rs赋给cells,到这里,表示初始化成功,同时增加给定值成功
                    cells = rs;
                    //init设置为true
                    init = true;
                }
            } finally {
                //无论上面有没有成功,最终会将cellsBusy置为0,表示释放CAS锁
                cellsBusy = 0;
            }
            //最后判断init如果为true,表示cells数组初始化同时增加给定值成功,那么break跳出这个死循环,longAccumulate方法结束
            if (init)
                break;
            //到这一步还没有结束,表示cells数组初始化同时增加给定值失败,因为数组已被其他线程除除四化了,随后会进行下一次循环
        }
        /*
         * 否则,上面两个条件都不满足,表示其他线程正在初始化数组,或者数组已被其他线程初始化完毕,或者CAS竞争锁失败
         * 那么还是尝试使用base变量来增加给定值,如果成功那么break跳出这个死循环,longAccumulate方法结束
         * 可以看到,即使进入了longAccumulate方法,还有有可能会使用base更新的,那就是多个线程判断cells为null并同时进入的情况下
         */
        else if (casBase(v = base, ((fn == null) ? v + x :
                fn.applyAsLong(v, x))))

            break;                          // Fall back on using base
        //到这里,表示casBase失败,那么继续下一次循环
    }
}

/**
 * 尝试CAS的将cellsBusy值从0更新为1,表示获取了CAS锁
 */
final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

/**
 * 使用xorshift算法基于当前probe伪随机的生成下一个probe值
 */
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
    return probe;
}

5.1.2.3 increment自增

public void decrement()

  自增1,内部就是调用的add(1L)方法。

/**
 * 自增1,内部调用add方法,参数为1
 */
public void increment() {
    add(1L);
}

5.1.2.4 sum统计

public long sum()

  返回当前总和。其实是base 的值与Cells 数组里面所有Cell元素中的value 值的累加。可以看到仅仅对base和cells数组元素value的简单累加,因此这个sum可能不是最新值,即不准确。

/**
 * 返回当前总和。其实是base 的值与Cells 数组里面所有Cell元素中的value 值的累加。
 *
 * @return 总和,非强一致性的
 */
public long sum() {
    //as保存此时的cells数组
    Cell[] as = cells;
    Cell a;
    //初始化sum,保存base
    long sum = base;
    //如果as不为null
    if (as != null) {
        //那么循环as数组将每一个元素的value相加,由于没有加CAS锁,此时的as可能不是最新的cells数组了
        for (int i = 0; i < as.length; ++i) {
            //如果某个索引位置元素不为nuull
            if ((a = as[i]) != null)
                //那么对sum进行累加
                sum += a.value;
        }
    }
    //返回sum
    return sum;
}

  longValue方法内部也是调用了sum方法:

public long longValue() {
    return sum();
}

5.1.2.5 reset重置

public void reset()

  reset为重置操作,将保持总和的变量重置为零。即base值置为0,如果有cells数组,则将每一个元素(如果存在)的value值置为0。
  该方法返回之后不代表此时总和一定为0,因为可能前面刚刚将某个位置的值置为0,后面马上被其他线程增加了值,因此这个方法也没有任何保证。

/**
 * reset为重置操作,将保持总和的变量重置为零。
 * 即base值置为0,如果有cells数组,则将每一个元素(如果存在)的value值置为0 。
 */
public void reset() {
    //as保存此时的cells数组
    Cell[] as = cells;
    Cell a;
    //base重置为0
    base = 0L;
    //如果as不为null
    if (as != null) {
        //那么循环as数组将每一个元素的value置为0,由于没有加CAS锁,此时的as可能不是最新的cells数组了
        for (int i = 0; i < as.length; ++i) {
            //如果某个索引位置元素不为nuull
            if ((a = as[i]) != null)
                //该位置元素的value置为0
                a.value = 0L;
        }
    }
}

5.1.2.6 sumThenReset统计并重置

  相当于sum()后跟reset()。

/**
 * 相当于sum()后跟reset()。
 *
 * @return 总和
 */
public long sumThenReset() {
    //as保存此时的cells数组
    Cell[] as = cells;
    Cell a;
    //初始化sum,保存base
    long sum = base;
    //base重置为0
    base = 0L;
    //如果as不为null
    if (as != null) {
        //那么循环as数组将每一个元素的value值相加,随后置为0,由于没有加CAS锁,此时的as可能不是最新的cells数组了
        for (int i = 0; i < as.length; ++i) {
            //如果某个索引位置元素不为null
            if ((a = as[i]) != null) {
                //那么对sum进行累加
                sum += a.value;
                //该位置元素的value置为0
                a.value = 0L;
            }
        }
    }
    //返回sum
    return sum;
}

5.2 LongAccumulator

5.2.1 LongAccumulator的概述

public class LongAccumulator
  extends Number
  implements Serializable

  LongAccumulator同样是来自于JDK1.8的atomic包,和LongAdder都继承了Striped64,但是LongAccumulator 比LongAdder 的功能更强大。
  LongAccumulator 相比于LongAdder,可以为累加器提供非0 的初始值,后者只能提供默认的0 值。另外,前者还可以指定累加规则,比如不进行累加而进行相乘,只需要在构造LongAccumulator 时传入自定义的双目运算器即可,后者则只能是累加规则。

5.2.2 LongAccumulator的原理

5.2.2.1 内部结构

  LongAccumulator内部具有两个自己的属性,一个LongBinaryOperator类型的双目运算器实例,用于保存累加规则。一个identity用于保存指定的初始值,也是base的初始值,在reset等重置方法调用的时候也会赋值为base的值。
  只有一个构造器,可以传递指定的累加规则和指定初始值。

/**
 * 内部的LongBinaryOperator类型的属性,用于保存传入的双目运算器
 * LongBinaryOperator是一个函数式接口,就是对累加规则的封装
 */
private final LongBinaryOperator function;

/**
 * identity用于保存的初始值,也是base的初始值,在reset等重置方法调用的时候也会赋值为base的值
 */
private final long identity;

/**
 * 使用给定的累加器和初始值创建新实例。
 *
 * @param accumulatorFunction 一个双目运算器实例,可以对两个long类型的值进行计算
 *                            如果为null那么在计算时将会抛出NullPointerException
 * @param identity            初始值
 */
public LongAccumulator(LongBinaryOperator accumulatorFunction,
                       long identity) {
    //为function赋值
    this.function = accumulatorFunction;
    //为base和identity赋值
    base = this.identity = identity;
}

5.2.2.2 accumulate更新给定值

public void accumulate(long x)

  具有给定值的更新。作为LongAccumulator的核心方法!
  accumulate方法类似于LongAdder的add方法,区别在于可以使用构造器中指定的累加器中的累加规则更新数据。
  这里的LongBinaryOperator是一个JDK1.8添加的函数式接口,主要是为了lambda的调用,这个接口封装了对两个long类型参数的操作规则,通过调用applyAsLong方法对传入的参数进行操作并返回操作的结果。

/**
 * 更新方法,与LongAdder的add方法差别在于:
 * 1.调用casBase时LongAdder传递的是b+x,LongAccumulator则使用了r = function .applyAsLong(b = b ase, x) 自定义的规则来计算。
 * 2.调用longAccumulate 时第二个参数LongAdder传递的是null,LongAccumulator传递了function累加器
 */
public void accumulate(long x) {
    Striped64.Cell[] as;
    long b, v, r;
    int m;
    Striped64.Cell a;

    if ((as = cells) != null ||
            //差别1:base要被更新为通过运算器对base和x计算出来的结果,这里也能知道传递的function不能为null
            (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                        (r = function.applyAsLong(v = a.value, x)) == v ||
                                a.cas(v, r)))
            //差别2:第二个参数传递的function累加器,而不是null
            //在longAccumulate方法中,在CAS更新value或者base的时候,会判断function是否为null,即:
            //a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
            //casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))
            //如果不为null,那么使用制定累加器规则更新
            longAccumulate(x, function, uncontended);
    }
}

5.2.3 其他操作

public long get()

  返回当前值。类似于LongAdder的sum方法,只不过由加法变成了指定的累加规则。

public long longValue()

  内部调用 get()方法。

public void reset()

  重置维持更新到标识值的变量。类似于LongAdder的reset方法,只不过base值由0变成了在构造器中传递的identity。

5.2.4 案例

  一个指定(a * b / 2)的累加规则的案例如下:

//传递一个累加器LongBinaryOperator实例
LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
    @Override
    public long applyAsLong(long left, long right) {
        //指定的累加规则 left * right / 2
        //这里的left对应LongAccumulator中的base或者某个Cell的value,这的right对应accumulate传递的参数x
        return left * right / 2;
    }
    //初始值为2
}, 2);
//更新2
longAccumulator.accumulate(6);
//获取结果,应该是6
System.out.println(longAccumulator.get());

5.2.5 LongAccumulator的总结

  LongAccumulator的功能更加强大,可以指定初始值和累加规则,这样看起来LongAdder更像是LongAccumulator的特例,即初始值为0,累加规则为加法。
  因此如果想要使用longAccumulator实现LongAdder的功能,那么我们手动将累加规则指定为加法,并且identity指定为0即可:

LongAccumulator longAccumulator = new LongAccumulator(new 
LongBinaryOperator() {
    @Override
    public long applyAsLong(long left, long right) {
        //返回left+right,这样累加规则就和LongAdder一样了
        return left + right;
    }
    //初始值为0
}, 0);
//更新6
longAccumulator.accumulate(6);
//更新1
longAccumulator.accumulate(1);
//获取结果,应该是7
System.out.println(longAccumulator.get());

  可以看到我们初始化LongAccumulator对象的代码量还是比较多的,特别是创建匿名对象的代码。上面我们说过这个LongBinaryOperator是一个函数式接口,因此我们推荐使用lambda表达式的写法:

LongAccumulator longAccumulator = new LongAccumulator((left, right) -> {
    return left + right;
}, 0);

  更进一步,我们可以使用方法引用:

LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);

  它们的含义都是一样的,即采用加法运算,但是代码量却简单了许多,所以lambda还是很有必要学习的!

6 JMH性能测试

  下面我们测试LongAdder和AtomicLong的性能差别,我们使用JMH方法性能测试。Java使用JMH进行方法性能优化测试

  本次主要测试AtomicLong的getAndIncrement方法、LongAdder的increment方法以及使用synchronized同步的方法在1秒钟之内能调用多少次,即测试方法的吞吐量。

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.Throughput)
public class AdderJMHTest {

    private static AtomicLong count = new AtomicLong();
    private static LongAdder longAdder = new LongAdder();
    private static long syn = 0L;

    public static void main(String[] args) throws Exception {
        Options options = new OptionsBuilder().include(AdderJMHTest.class.getName()).warmupIterations(1).measurementIterations(2).forks(1).build();
        new Runner(options).run();
    }

    @Benchmark
    @Threads(10)
    public void run0() {
        count.getAndIncrement();
    }

    @Benchmark
    @Threads(10)
    public void run1() {
        longAdder.increment();
    }

    @Benchmark
    @Threads(10)
    public void run2() {
        synchronized (AdderJMHTest.class) {
            ++syn;
        }
    }
}

  在开启JIT优化的情况下,开启1个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2  155.021          ops/us
AdderJMHTest.run1  thrpt    2  124.791          ops/us
AdderJMHTest.run2  thrpt    2   57.716          ops/us

  在开启JIT优化的情况下,开启2个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   56.432          ops/us
AdderJMHTest.run1  thrpt    2  243.411          ops/us
AdderJMHTest.run2  thrpt    2   32.125          ops/us

  在开启JIT优化的情况下,开启5个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   52.174          ops/us
AdderJMHTest.run1  thrpt    2  486.320          ops/us
AdderJMHTest.run2  thrpt    2   36.689          ops/us

  在开启JIT优化的情况下,开启10个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   48.207          ops/us
AdderJMHTest.run1  thrpt    2  756.315          ops/us
AdderJMHTest.run2  thrpt    2   31.929          ops/us

  在开启JIT优化的情况下,开启20个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   50.508          ops/us
AdderJMHTest.run1  thrpt    2  791.501          ops/us
AdderJMHTest.run2  thrpt    2   36.743          ops/us

  在开启JIT优化的情况下,开启50个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   52.193          ops/us
AdderJMHTest.run1  thrpt    2  800.270          ops/us
AdderJMHTest.run2  thrpt    2   31.817          ops/us

  在开启JIT优化的情况下,开启100个线程,结果如下:

Benchmark           Mode  Cnt    Score   Error   Units
AdderJMHTest.run0  thrpt    2   51.982          ops/us
AdderJMHTest.run1  thrpt    2  842.155          ops/us
AdderJMHTest.run2  thrpt    2   34.325          ops/us

  可见,synchronized吞吐量最少。如果没有线程竞争,那么LongAdder和LongAccumulator的吞吐量差不多。如果线程竞争较多,那么LongAdder吞吐量降低,LongAccumulator吞吐量继续升高,是AtomicLong的是十倍以上。
  在使用-Xint参数关闭JIT优化的情况下,开启10个线程,结果如下:

Benchmark           Mode  Cnt   Score   Error   Units
AdderJMHTest.run0  thrpt    2   4.781          ops/us
AdderJMHTest.run1  thrpt    2  15.816          ops/us
AdderJMHTest.run2  thrpt    2   3.925          ops/us

  虽然它们的总体性能都严重降低,但是LongAdder的吞吐量仍然最大,这也说明Java的JIT优化的牛逼之处。

7 atomic的总结

  JDK1.5出现的atomic包下面的原子类,在对于单个变量的复合操作(比如读-写)中可以代替锁的来保证操作的原子性和安全性,并且由于没有使用锁而有不错的性能,但是对于多个变量的复合操作以及一批代码的原子性和安全性却无能为力,此时只能使用锁。
  我们可以看到,实际上volatile关键字以及Unsafe类提供的CAS的方法就是构成原子类的基石,原子类的方法实际上就是对于Unsafe中的CAS方法的二次包装,方法开发人员使用而已。Unsafe中的CAS方法作为native方法,本身并不是Java语言实现的,它们的源码位于JVM虚拟机的源码中,HotSpot虚拟机的源码中就有这些native方法的具体实现,它们都是采用C++的代码实现的,方便与底层系统交互,在openjdk中可以找到。
  本文没有对一些基本知识点做深入讲解,比如Unsafe、volatile、CAS、伪共享、JMH等,因为前面的文章中已经讲了,都是深入到了虚拟机源码级别,如果想要深入了解原子类的原理,应该要看看以下文章!

本文地址:https://blog.csdn.net/weixin_43767015/article/details/107895944