AtomicInteger底层实现原理是什么?如何在自己的产品代码中应用CAS操作?
今天主要来围绕CAS技术来说一下,分析并发包的内部结构,看看内部结构、线程池,是基于哪一种原理去实现的。
今天的问题主要是,AtomicInteger底层实现原理是什么?如何在自己的产品代码中应用CAS操作?
概述
AtomicInteger是对int类型的一个封装,提供原子性的访问和更新操作,原子性操作基于CAS(compare-and-swap)技术
CAS(compare-and-swap)
:在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。(from wiki)
简单来说,CAS避免了因为一条线程出现错误而导致让其他线程也受到影响,保证了并发的安全性。
所谓CAS,就是获取到修改的数据,利用CAS指令进行试图更新。如果数值没变,代表其他线程没有对齐修改,就成功更新,否则,就将修改成功与失败的结果返回,或者重新请求更新。
可以来看看AtomicInteger的底层,它主要依赖于Unsafe进行操作,以volatile的value字段保证可见性。(volatile在之前讲过如何实现可见性,cache与主存,你懂得)
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
具体原子操作细节,可以参照原子更新,Unsafe会利用value字段的内存地址偏移,直接完成操作。
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
因为getAndIncrement需要返回值,所以要添加失败重试逻辑。
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
像以上所说,不重新申请,也可以直接返回是与否:
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
CAS 是java并发编程中lock-free的基础
考点分析
虽然开发中很少涉及到CAS的底层实现,但是掌握在Java中如何实现该技术还是很重要的。
面试往往会问到:
-
什么时候使用到CAS,有没有比 调用Unsafe更好的实现方法?
-
理解ReentrantLock,CycliBarrier并发结构的实现
扩展
假设在数据库,我们要求一个索引只固定修改一个索引分析,如何在sql抽象层去实现呢?
可以考虑为索引分区对象添加一个逻辑上的锁,使用独占的线程id为锁的数值,利用原子操作控制锁的施加与释放,代码如下:
public class AtomicBTreePartition{
private volatile long lock;
public void acquireLock(){};
public void releaseLock(){};
}
目前实现锁操作。
第一种方法:可以使用java内部提供的基于反射机制的java.util.concurrent.atomic.AtomicLongFieldUpdater实现CAS操作,但需要保证类型和字段名称正确。
第二种方法,就是使用是Variable Handle API,提供了各种粒度的原子或者有序性的操作。前面的代码修改如下:
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
(AtomicSample.class, "lock");
private void acquiredLock() {
long t = Thread.currentThread().getId();
while (!HANDLE.compareAndSet(this, 0L, t)) {
//等待....数据操作延迟
}
}
过程是先获取到变量句柄,然后直接调用提供的CAS方法。
一般来说推荐第二种方法,但是CAS也有着以下的副作用:当出现以外,大批量线程频繁重新请求重试,就会给CPU带来很大负担。
下面来介绍一下AbstractQueuedSynchronizer(AQS)
它是在Java并发包中,实现各种同步结构和部分其他组成单元(如线程池中的Worker)的基础。
首先要理解,为什么需要AQS,如何使用AQS,至少要做什么,再进一步结合JDK源码中的实践,理解AQS的原理和应用。
AQS的内部数据和方法,可以简单拆分为:
-
一个volatile的整数成员表征状态,同时提供了setState和getState方法
private volatile int state;
-
一个先入先出的等待线程队列,以实现多线程之间的竞争和等待
-
基于CAS结构同步实现acquire/release方法
以ReentrantLock为例,它内部通过扩展AQS实现了Sync类型,以AQS的state来反映锁的持有情况。
private final Sync sync;
abstract static class Sync extends AbstractQueueSynchronizer{}
然后是ReentrantLock对应的acquire和release操作,如果是CountDownLatch则可以看做是await()/countDown(),具体实现也有区别。
public void Lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
排除掉一些细节,整体地分析acquire方法逻辑,其直接实现是在AQS内部,调用了tryAcquire和acquireQueued
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
在ReentrantLock中,tryAcquire逻辑实现在NonfairSync和FairSync中,分别提供了进一步的非公平或公平性方法。
代码如下:
public ReentrantLock(){
sync = new NonfairSync();//默认非公平
}
public ReentrantLock(boolean fair){
sync = fair ? new FairSync():new NonfairSync();
}
再来看看AQS内部的tryAcquire,内部实现了如何配合CAS获取锁,以非公平的tryAcquire举例,它并不会检查其他等待者。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果前面的tryAcquire失败,代表着锁争抢失败,进入排队竞争阶段,就会用FIFO队列,实现线程间对锁的竞争的部分,是AQS的核心逻辑。
当前线程会被包装成为一个排他模式的节点(EXCLUSIVE),通过addWaiter方法添加到队列中。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
到这里线程试图获取锁的过程基本展现出来了,tryAcquire是按照特定场景需要开发者去实现,线程竞争是通过AQS通过Waiter队列与acquireQueued提供的,在release方法中,同样会对队列进行操作。
上一篇: MySQL索引优化(二)索引失效
下一篇: Database