Java并发编程之你们常说的CAS到底是个啥?!
独占锁是一种悲观锁,synchronized
就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。
而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap
。
简单的理解,CAS操作包含三个操作数:内存值(V)、预期原值(A)、新值(B)。
如果内存值(V)与预期原值(A)相匹配,那么就会自动将该内存位置的值(V)更新为新值(B)。否则,将不更新该位置的值,只要告诉我现在这个位置的值即可。V == A ? V = B
当多个线程尝试使用CAS同时更新一个变量,最终只有一个线程会成功,其他线程都会失败,和synchronized不同的是,失败的线程不会被阻塞,而是被告知本次更新操作失败了,可以再试一次。线程可以根据实际情况,再次重试或者跳过,大大减少因为阻塞而损失的性能。所以CAS是一个乐观操作。
Atom原子类中的CAS
我们常用的Atom原子类中也有CAS的体现。
比如AtomicInteger
中的getAndIncrement
方法。
//AtomicInteger
private volatile int value;
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//Unsafe
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
上述代码中,value被
volatile
关键字修饰,可以保证value的可见性。首先根据偏移量拿到当前内存中的内存值(V)赋值给var5,将其作为期望原值(A),调用compareAndSwapInt
方法,将var5+var4
作为新值(B),在compareAndSwapInt
方法中,再次根据偏移量拿到最新的内存值(V),和期望原值(A) var5进行比较,如果相同则替换,返回true。如果不同则不做操作,返回false,再走一遍上述操作。
compareAndSwapLong
这是一个native方法,调用的是底层c语言的代码。
线程池中的CAS
我们直接看ThreadPoolExecutor
中的addWorker
方法中的一段代码。
//ThreadPoolExecutor
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
//下面省略
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//AtomicInteger
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//Unsafe
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
}
retry:这retry就是一个标记,标记对一个循环方法的操作(continue和break)处理点,功能类似于goto,所以retry一般都是伴随着for循环出现,retry:标记的下一行就是for循环,在for循环里面调用continue(或者break)再紧接着retry标记时,就表示从这个地方开始执行continue(或者break)操作
这段代码中的操作和上面AtomicInteger中是否很类似呢?自旋,然后ctl的值做CAS操作。如果增加成功,直接跳出循环,否则在检查线程池状态后,再次在内循环调用compareAndIncrementWorkerCount,直到添加成功。
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}