AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作?
AtomicInteger 底层实现原理是什么? 如何在自己代码中应用 CAS 操作
AtomicInteger 是对 int 类型的一个封装,提供原子性的访问和更新操作,其原子性的操作实现是基于 CAS (compare-and-swap)技术。
- https://en.wikipedia.org/wiki/Compare-and-swap
CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用 CAS 指令试图进行更新,如果当前数值不变,代码没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就反应一个成功或者失败的结果。
ActomicInteger 实现原理
ActomicInteger 的内部属性可以看到,它是依赖 Unsafe 的一些底层能力,进行底层操作,以 volatile 的 value 字段,记录数值,以保证可见性。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
Unsafe 会利用 value 字段的内存地址便宜,直接完成操作。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
getAndIncrement 是需要明确返回值的,因此 getAndAddInt 实现是需要失败重试,最后拿到返回值的。
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
CompareAndset 这样的直接返回 Boolean 值,不需要失败重试。
CAS 底层如何实现
CAS 底层实现,依赖 CPU 特定指令, 具体根据体系的不同还存在明显的区别。
例如,x86 CPU 提供 cmpxchg 指令。而在精简指令集的体系架构中,则通常是靠一对儿指令(如" load and reserve"和" store conditional")实现的,在大多数处理器上CAS都是个非常轻量级的操作,这也是其优势所在。
CAS 使用场景
可以设想这样一个场景:在数据库产品中,为保证索引的一致性,一个常见的选择是,保证只有一个线程能够排他性地修改一个索引分区,如何在数据库抽象层实现?
可以考虑为索引分区对象添加一个逻辑上的锁,例如,以当前独占的线程ID作为锁的数值,然后通过原子操作设置lock数值,来实现加锁和释放锁,伪代码如下:
public class AtomicBTreePartition {
private volatile long lock;
public void acquireLock();
public void releaseeLock();
}
使用 AtomicLongFieldUpdater 替代 Unsafe
那么在Java代码中,我们怎么实现锁操作呢? Unsafe 似乎不是个好的选择,例如,我就注意到类似 Cassandra等产品,因为Java9中移除了 Unsafe.moniter Enter()/moniterEXit(),导致无法平滑升级到新的JDK版本。目前Java提供了两种公共API,可以实现这种CAS操作,比如使用 java.util.concurrent.atomic.AtomicLongFieldUpdater,它是基于反射机制创建,我们需要保证类型和字段名称正确。
private static final AtomicLongFieldupdater<AtomicBTreePartition> lockFieldUpdater = AtomicLongFieldupdater.newUpdater(AtomicBTreePartition. class,"lock");
private void acquireLock(){
long t= Thread currentThread().getId();
while(!lockFieldUpdater.compareAndSet(this, 0L, t)){
// 等待一会儿数据库架可比较慢
...
}
}
VariableHadnle 替换 Unsafe
如果是Java9以后,我们完全可以釆用另外一种方式实现,也就是 Variable handle api,这是源自于JEp193,提供了各种粒度的原子或者有序性的操作等。
- http://openjdk.java.net/jeps/193
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle(AtomicBTreePartition. class, "lock");
private void acquireLock(){
long t= Thread currentThread().getId();
while(!HANDLE.compareAndSet(this, 0L, t)){
// 等待一会儿数据库架可比较慢
...
}
}
CAS 副作用
-
可能过度消耗 CPU
试想,其常用的失败重试机制,隐含着一个假设,即假设竞争情况是短暂的。大多数应用场景中,确实大部分重试只会发生一次就获得了成功,但是总是有意外情况,所以在有需要的时候,还是要考虑限制自旋的次数,以免过度消耗CPU -
ABA 问题
这是通常只在lock-free算法下暴露的问题。我前面说过CAS是在更新时比较前值,如果对方只是恰好相同,例如期间发生了A->B->A的更新,仅仅判断数值是A,可能导致不合理的修改操作。针对这种情况,Java提供了 AtomicStampedReference工具类,通过为引用建立类似版本号(stamp)的方式,来保证CAS的正确性,具体用法请參考这里的介绍
- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicStampedReference.html
- http://tutorials.jenkov.com/java-util-concurrent/atomicstampedreference.html
String initialRef = "initial value referenced";
int initialStamp = 0;
AtomicStampedReference<String> atomicStringReference =
new AtomicStampedReference<String>(
initialRef, initialStamp
);
String newRef = "new value referenced";
int newStamp = initialStamp + 1;
boolean exchanged = atomicStringReference
.compareAndSet(initialRef, newRef, initialStamp, newStamp);
System.out.println("exchanged: " + exchanged); //true
exchanged = atomicStringReference
.compareAndSet(initialRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged); //false
exchanged = atomicStringReference
.compareAndSet(newRef, "new string", initialStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged); //false
exchanged = atomicStringReference
.compareAndSet(newRef, "new string", newStamp, newStamp + 1);
System.out.println("exchanged: " + exchanged); //true
AbstractQueuedSynchronizer (AQS)
理解为什么需要AQS,如何使用AQS,至少要做什么,再进一步结合JDK源代码中的实践,理解AQS的原理与应用Doug Lea曾经介绍过AQS的设计初衷。比如 Semaphore 就选择了将基础的同步相关操作抽象在 AbstractQueuedSynchronizer 中,利用AQS为我们构建冋步结构握供了范本。
AQS 内部数据和方法
- 一个 volatile的整数成员表征状态,同时提供了 setstate和 getstate方法
private volatile int state
- 一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是AQS机制的核心之一
- 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的 acquire/ release方法。
利用AQS实现一个同步结构,至少要实现两个基本类型的方法,分别是 acquire操作,获取资源的独占权;还有就是 release操作,释放对某个资源的独占。
AQS 的应用场景
ReentrantLock 就是基于AQS 实现的
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
ReentrantLock 的 acquire ,release操作
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
看 sync aquire 内部实现。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
排除掉一些细节,整体地分析 acquire方法逻辑,其直接实现是在AQS内部,调用了 tryAcquire 和 acquireQueued,这是两个需要搞淸楚的基本部分。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
以非公平的 tryAcquire为例,其内部实现了如何配合状态与CAS获取锁,注意,对比公平版本的 tryAcquire,它在锁无人占有时,并不检查是否有其他等待者,这里体现了非公平的。
非公平版本:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平版本的实现会检查队列中是否有等待者
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
接下来我再来分析 acquireQueued,如果前面的 tryAcquire失败,代表着锁争抢失败,进入排队竞争阶段。这里就是我们所说的,利用FIFO队列,实现线程间对锁的竞争的部分算是AQS的核心逻辑。
acquireQueued 实现原理
当前线程会被包装成为一个排他模式的节点( EXCLUSIVE),通过 addWaiter方法添加到队列中。 acquireQueued 的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获取锁,一切顺利则成为新的头节点;否则,有必要则等待,具体处理逻辑请參考我添加的注释。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 循环
final Node p = node.predecessor();// 获取前一个节点
if (p == head && tryAcquire(arg)) {// 如果前一个节点是头结点,标识当前节点合适 tryAcquire
setHead(node);// acuire 成功,则设置新头节点。
p.next = null; // help GC 将前面节点对当前节点的引用清空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 检查是否失败后需要park, 然后循环去入队
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 出现一次,取消
}
}
// 入队逻辑
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
到这里线程试图获取锁的过程基本展现出来了,tryAcquire是按照特定场景需要开发者去实现的部分,而线程间竞争则是AQS通过Waiter队列与 acquire在 release方法中,同样会对队列进行对应操作。
程序员开发者社区
上一篇: Linux 系统添加 Swap 交换分区
下一篇: Linux线程同步---互斥量