欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作?

程序员文章站 2022-05-05 10:29:02
...

AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作

AtomicInteger 是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性的操作实现是基于 CAS (compare-and-swap)技术。

  • https://en.wikipedia.org/wiki/Compare-and-swap

CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS 指令试图进行更新,如果当前数值不变,代码没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就反应一个成功或者失败的结果。

ActomicInteger 实现原理

ActomicInteger 的内部属性可以看到,它是依赖 Unsafe 的一些底层能力,进行底层操作,以 volatile 的 value 字段,记录数值,以保证可见性。

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
    try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

Unsafe 会利用 value 字段的内存地址便宜,直接完成操作。

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!compareAndSwapInt(o, offset, v, v + delta));
        return v;
    }

getAndIncrement 是需要明确返回值的,因此 getAndAddInt 实现是需要失败重试,最后拿到返回值的。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

CompareAndset 这样的直接返回 Boolean 值,不需要失败重试。

CAS 底层如何实现

CAS 底层实现,依赖 CPU 特定指令, 具体根据体系的不同还存在明显的区别。
例如,x86 CPU 提供 cmpxchg 指令。而在精简指令集的体系架构中,则通常是靠一对儿指令(如" load and reserve"和" store conditional")实现的,在大多数处理器上CAS都是个非常轻量级的操作,这也是其优势所在。

CAS 使用场景

可以设想这样一个场景:在数据库产品中,为保证索引的一致性,一个常见的选择是,保证只有一个线程能够排他性地修改一个索引分区,如何在数据库抽象层实现?

可以考虑为索引分区对象添加一个逻辑上的锁,例如,以当前独占的线程ID作为锁的数值,然后通过原子操作设置lock数值,来实现加锁和释放锁,伪代码如下:

public class AtomicBTreePartition {
    private volatile long lock;
    public void acquireLock();
    public void releaseeLock();
}   

使用 AtomicLongFieldUpdater 替代 Unsafe

那么在Java代码中,我们怎么实现锁操作呢? Unsafe 似乎不是个好的选择,例如,我就注意到类似 Cassandra等产品,因为Java9中移除了 Unsafe.moniter Enter()/moniterEXit(),导致无法平滑升级到新的JDK版本。目前Java提供了两种公共API,可以实现这种CAS操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射机制创建,我们需要保证类型和字段名称正确。

private static final AtomicLongFieldupdater<AtomicBTreePartition> lockFieldUpdater = AtomicLongFieldupdater.newUpdater(AtomicBTreePartition. class,"lock");
private void acquireLock(){
   long t= Thread currentThread().getId();
   while(!lockFieldUpdater.compareAndSet(this, 0L, t)){
      // 等待一会儿数据库架可比较慢
      ...
   }
}

VariableHadnle 替换 Unsafe

如果是Java9以后,我们完全可以釆用另外一种方式实现,也就是 Variable handle api,这是源自于JEp193,提供了各种粒度的原子或者有序性的操作等。

  • http://openjdk.java.net/jeps/193
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle(AtomicBTreePartition. class, "lock");
private void acquireLock(){
   long t= Thread currentThread().getId();
   while(!HANDLE.compareAndSet(this, 0L, t)){
      // 等待一会儿数据库架可比较慢
      ...
   }
}

CAS 副作用

  1. 可能过度消耗 CPU
    试想,其常用的失败重试机制,隐含着一个假设,即假设竞争情况是短暂的。大多数应用场景中,确实大部分重试只会发生一次就获得了成功,但是总是有意外情况,所以在有需要的时候,还是要考虑限制自旋的次数,以免过度消耗CPU

  2. ABA 问题
    这是通常只在lock-free算法下暴露的问题。我前面说过CAS是在更新时比较前值,如果对方只是恰好相同,例如期间发生了A->B->A的更新,仅仅判断数值是A,可能导致不合理的修改操作。针对这种情况,Java提供了 AtomicStampedReference工具类,通过为引用建立类似版本号(stamp)的方式,来保证CAS的正确性,具体用法请參考这里的介绍

  • https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicStampedReference.html
  • http://tutorials.jenkov.com/java-util-concurrent/atomicstampedreference.html
String initialRef   = "initial value referenced";
int    initialStamp = 0;

AtomicStampedReference<String> atomicStringReference =
    new AtomicStampedReference<String>(
        initialRef, initialStamp
    );

String newRef   = "new value referenced";
int    newStamp = initialStamp + 1;

boolean exchanged = atomicStringReference
    .compareAndSet(initialRef, newRef, initialStamp, newStamp);
System.out.println("exchanged: " + exchanged);  //true

exchanged = atomicStringReference
    .compareAndSet(initialRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //false

exchanged = atomicStringReference
    .compareAndSet(newRef, "new string", initialStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //false

exchanged = atomicStringReference
    .compareAndSet(newRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged);  //true

AbstractQueuedSynchronizer (AQS)

理解为什么需要AQS,如何使用AQS,至少要做什么,再进一步结合JDK源代码中的实践,理解AQS的原理与应用Doug Lea曾经介绍过AQS的设计初衷。比如 Semaphore 就选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用AQS为我们构建冋步结构握供了范本。

AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作?

AQS 内部数据和方法

  • 一个 volatile的整数成员表征状态,同时提供了 setstate和 getstate方法
private volatile int state
  • 一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是AQS机制的核心之一
  • 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的 acquire/ release方法。

利用AQS实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire操作,获取资源的独占权;还有就是 release操作,释放对某个资源的独占。

AQS 的应用场景

ReentrantLock 就是基于AQS 实现的

private final Sync sync;

/**
 * Base of synchronization control for this lock. Subclassed
 * into fair and nonfair versions below. Uses AQS state to
 * represent the number of holds on the lock.
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

ReentrantLock 的 acquire ,release操作

public void lock() {
    sync.lock();
}
 public void unlock() {
    sync.release(1);
}

看 sync aquire 内部实现。

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

排除掉一些细节,整体地分析 acquire方法逻辑,其直接实现是在AQS内部,调用了 tryAcquireacquireQueued,这是两个需要搞淸楚的基本部分。

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

以非公平的 tryAcquire为例,其内部实现了如何配合状态与CAS获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的。

非公平版本:

 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

公平版本的实现会检查队列中是否有等待者

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

接下来我再来分析 acquireQueued,如果前面的 tryAcquire失败,代表着锁争抢失败,进入排队竞争阶段。这里就是我们所说的,利用FIFO队列,实现线程间对锁的竞争的部分算是AQS的核心逻辑。

acquireQueued 实现原理

当前线程会被包装成为一个排他模式的节点( EXCLUSIVE),通过 addWaiter方法添加到队列中。 acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获取锁,一切顺利则成为新的头节点;否则,有必要则等待,具体处理逻辑请參考我添加的注释。

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 循环
                final Node p = node.predecessor();// 获取前一个节点
                if (p == head && tryAcquire(arg)) {// 如果前一个节点是头结点,标识当前节点合适 tryAcquire
                    setHead(node);// acuire 成功,则设置新头节点。
                    p.next = null; // help GC 将前面节点对当前节点的引用清空
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())// 检查是否失败后需要park, 然后循环去入队
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);  // 出现一次,取消
        }
    }
  
//  入队逻辑
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
   

到这里线程试图获取锁的过程基本展现出来了,tryAcquire是按照特定场景需要开发者去实现的部分,而线程间竞争则是AQS通过Waiter队列与 acquire在 release方法中,同样会对队列进行对应操作。

程序员开发者社区

AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作?

相关标签: Java