Java 高并发十: JDK8对并发的新支持详解
1. longadder
和atomiclong类似的使用方式,但是性能比atomiclong更好。
longadder与atomiclong都是使用了原子操作来提高性能。但是longadder在atomiclong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能。在无锁中,也可以用类似的方式来增加cas的成功率,从而提高性能。
longadder原理图:
atomiclong的实现方式是内部有个value 变量,当多线程并发自增,自减时,均通过cas 指令从机器指令级别操作保证并发的原子性。唯一会制约atomiclong高效的原因是高并发,高并发意味着cas的失败几率更高, 重试次数更多,越多线程重试,cas失败几率又越高,变成恶性循环,atomiclong效率降低。
而longadder将把一个value拆分成若干cell,把所有cell加起来,就是value。所以对longadder进行加减操作,只需要对不同的cell来操作,不同的线程对不同的cell进行cas操作,cas的成功率当然高了(试想一下3+2+1=6,一个线程3+1,另一个线程2+1,最后是8,longadder没有乘法除法的api)。
可是在并发数不是很高的情况,拆分成若干的cell,还需要维护cell和求和,效率不如atomiclong的实现。longadder用了巧妙的办法来解决了这个问题。
初始情况,longadder与atomiclong是相同的,只有在cas失败时,才会将value拆分成cell,每失败一次,都会增加cell的数量,这样在低并发时,同样高效,在高并发时,这种“自适应”的处理方式,达到一定cell数量后,cas将不会失败,效率大大提高。
longadder是一种以空间换时间的策略。
2. completablefuture
实现completionstage接口(40余个方法),大多数方法多数应用在函数式编程中。并且支持流式调用
completablefuture是java 8中对future的增强版
简单实现:
import java.util.concurrent.completablefuture; public class askthread implements runnable { completablefuture<integer> re = null; public askthread(completablefuture<integer> re) { this.re = re; } @override public void run() { int myre = 0; try { myre = re.get() * re.get(); } catch (exception e) { } system.out.println(myre); } public static void main(string[] args) throws interruptedexception { final completablefuture<integer> future = new completablefuture<integer>(); new thread(new askthread(future)).start(); // 模拟长时间的计算过程 thread.sleep(1000); // 告知完成结果 future.complete(60); } }
future最令人诟病的就是要等待,要自己去检查任务是否完成了,在future中,任务完成的时间是不可控的。而 completablefuture的最大改进在于,任务完成的时间也开放了出来。
future.complete(60);
用来设置完成时间。
completablefuture的异步执行:
public static integer calc(integer para) { try { // 模拟一个长时间的执行 thread.sleep(1000); } catch (interruptedexception e) { } return para * para; } public static void main(string[] args) throws interruptedexception, executionexception { final completablefuture<integer> future = completablefuture .supplyasync(() -> calc(50)); system.out.println(future.get()); } completablefuture的流式调用: public static integer calc(integer para) { try { // 模拟一个长时间的执行 thread.sleep(1000); } catch (interruptedexception e) { } return para * para; } public static void main(string[] args) throws interruptedexception, executionexception { completablefuture<void> fu = completablefuture .supplyasync(() -> calc(50)) .thenapply((i) -> integer.tostring(i)) .thenapply((str) -> "\"" + str + "\"") .thenaccept(system.out::println); fu.get(); }
组合多个completablefuture:
public static integer calc(integer para) { return para / 2; } public static void main(string[] args) throws interruptedexception, executionexception { completablefuture<void> fu = completablefuture .supplyasync(() -> calc(50)) .thencompose( (i) -> completablefuture.supplyasync(() -> calc(i))) .thenapply((str) -> "\"" + str + "\"") .thenaccept(system.out::println); fu.get(); }
这几个例子更多是侧重java8的一些新特性,这里就简单举下例子来说明特性,就不深究了。
completablefuture跟性能上关系不大,更多的是为了支持函数式编程,在功能上的增强。当然开放了完成时间的设置是一大亮点。
3. stampedlock
在上一篇中刚刚提到了锁分离,而锁分离的重要的实现就是readwritelock。而stampedlock则是readwritelock的一个改进。stampedlock与readwritelock的区别在于,stampedlock认为读不应阻塞写,stampedlock认为当读写互斥的时候,读应该是重读,而不是不让写线程写。这样的设计解决了读多写少时,使用readwritelock会产生写线程饥饿现象。
所以stampedlock是一种偏向于写线程的改进。
stampedlock示例:
import java.util.concurrent.locks.stampedlock; public class point { private double x, y; private final stampedlock sl = new stampedlock(); void move(double deltax, double deltay) { // an exclusively locked method long stamp = sl.writelock(); try { x += deltax; y += deltay; } finally { sl.unlockwrite(stamp); } } double distancefromorigin() { // a read-only method long stamp = sl.tryoptimisticread(); double currentx = x, currenty = y; if (!sl.validate(stamp)) { stamp = sl.readlock(); try { currentx = x; currenty = y; } finally { sl.unlockread(stamp); } } return math.sqrt(currentx * currentx + currenty * currenty); } }
上述代码模拟了写线程和读线程, stampedlock根据stamp来查看是否互斥,写一次stamp变增加某个值
tryoptimisticread()
就是刚刚所说的读写不互斥的情况。
每次读线程要读时,会先判断
if (!sl.validate(stamp))
validate中会先查看是否有写线程在写,然后再判断输入的值和当前的 stamp是否相同,即判断是否读线程将读到最新的数据。
如果有写线程在写,或者 stamp数值不同,则返回失败。
如果判断失败,当然可以重复的尝试去读,在示例代码中,并没有让其重复尝试读,而采用的是将乐观锁退化成普通的读锁去读,这种情况就是一种悲观的读法。
stamp = sl.readlock();
stampedlock的实现思想:
clh自旋锁:当锁申请失败时,不会立即将读线程挂起,在锁当中会维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中。每一个节点(一个节点代表一个线程),保存一个标记位(locked),用于判断当前线程是否已经释放锁。当一个线程试图获得锁时,取得当前等待队列的尾部节点作为其前序节点。并使用类似如下代码判断前序节点是否已经成功释放锁
while (pred.locked) {
}
这个循环就是不断等前面那个结点释放锁,这样的自旋使得当前线程不会被操作系统挂起,从而提高了性能。
当然不会进行无休止的自旋,会在若干次自旋后挂起线程。
上一篇: docker 常用命令
下一篇: 常用命令