学习非阻塞的同步机制CAS
在研究线程池的执行原理时,看到一段不断循环重试的代码,不理解它的原理,看注释这是cas的实现,所以学会之后记录下来。
锁有什么劣势
在多线程并发下,可以通过加锁来保证线程安全性,但多个线程同时请求锁,很多情况下避免不了要借助操作系统,线程挂起和恢复会存在很大的开销,并存在很长时间的中断。一些细粒度的操作,例如同步容器,操作往往只有很少代码量,如果存在锁并且线程激烈地竞争,调度的代价很大。
总结来说,线程持有锁,会让其他需要锁的线程阻塞,产生多种风险和开销。加锁是一种悲观方法,线程总是设想在自己持有资源的同时,肯定有其他线程想要资源,不牢牢锁住资源还不能放心呢。
在硬件的支持下,出现了非阻塞的同步机制,其中一种常用实现就是cas。
什么是cas
现代的处理器都包含对并发的支持,其中最通用的方法就是比较并交换(compare and swap),简称cas。
cas 操作包含三个操作数 —— 内存位置(v)、预期原值(a)和新值(b)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论v值是否等于a值,都将返回v的原值。cas 有效地说明了:我认为位置 v 应该包含值 a;如果包含该值,则将 b 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。
当多个线程尝试使用cas同时更新一个变量,最终只有一个线程会成功,其他线程都会失败。但和使用锁不同,失败的线程不会被阻塞,而是被告之本次更新操作失败了,可以再试一次。此时,线程可以根据实际情况,继续重试或者跳过操作,大大减少因为阻塞而损失的性能。所以,cas是一种乐观的操作,它希望每次都能成功地执行更新操作。
public class simulationcas { private int value; public synchronized int get() { return value; } public synchronized boolean compareandset(int expectedvalue, int newvalue) { if (expectedvalue == compareandswap(expectedvalue, newvalue)) { return true; } return false; } public synchronized int compareandswap(int expectedvalue, int newvalue) { int oldvalue = value; if (oldvalue == expectedvalue) { value = newvalue; } return oldvalue; } }
上面的代码模拟了cas的操作,其中compareandswap是cas语义的体现,compareandset对value进行了更新操作,并返回成功与否。
几行代码就实现了cas,是不是觉得很简单呢?但你要知道,cas仅仅告诉你操作结果,操作失败后一系列重试回退放弃等操作都要自己实现,开发起来远比使用锁复杂。
atom原子类
jvm是支持cas的,体现在我们常用的atom原子类,拿atomicinteger分析一下源码。
public final int getandincrement() { for (;;) { int current = get(); int next = current + 1; if (compareandset(current, next)) return current; } }
对atomicinteger进行+1操作,循环里,会将当前值和+1后的目标值传入compareandset,直到成功才跳出方法。compareandset是不是很熟悉呢,接着来看看它的代码。
// setup to use unsafe.compareandswapint for updates private static final unsafe unsafe = unsafe.getunsafe(); public final boolean compareandset(int expect, int update) { return unsafe.compareandswapint(this, valueoffset, expect, update); }
compareandset调用了unsafe.compareandswapint,这是一个native方法,原理就是调用硬件支持的cas方法。看懂这个应该就能明白atom类的原理,其他方法的实现是类似的。
线程池里的cas
有了cas的基础后,可以来研究那段我未看懂的代码。
提交一个执行任务,线程池会尝试增加一个工作线程去处理任务。下面是threadpoolexecutor里addworker的一段代码:
private boolean addworker(runnable firsttask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runstateof(c); // check if queue empty only if necessary. if (rs >= shutdown && ! (rs == shutdown && firsttask == null && ! workqueue.isempty())) return false; for (;;) { int wc = workercountof(c); if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; if (compareandincrementworkercount(c)) break retry; c = ctl.get(); // re-read ctl if (runstateof(c) != rs) continue retry; // else cas failed due to workercount change; retry inner loop } } //其他省略
在内循环里,会调用compareandincrementworkercount方法增加一个工作线程,原理和atomicinteger的getandincrement方法是一样的。如果增加成功,直接跳出循环,否则在检查线程池状态后,再次在内循环调用compareandincrementworkercount,直到添加成功。
现在再看代码,瞬间就明白了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。