JAVA并发-并发框架(一)
java并发的一系列框架和技术主要是由java.util.concurrent 包所提供。包下的所有类可以分为如下几大类:
- locks部分:显式锁(互斥锁和速写锁)相关;
- atomic部分:原子变量类相关,是构建非阻塞算法的基础;
- executor部分:线程池相关;
- collections部分:并发容器相关;
- tools部分:同步工具相关,如信号量、闭锁、栅栏等功能
整体实现技术可按照依赖级别分为以下三层:
高层类 |
lock 同步工具 并发容器 executor/executorcompletionservice |
基础类 |
aqs 非阻塞数据结构 原子变量类 |
底层原理 |
volatile变量的读/写 cas |
一、volatile
保证线程之间操作的可见性,避免操作的重排序,但不保证原子性(i++)。
由于 java 内存模型(jmm)规定,所有的变量都存放在主内存中,而每个线程都有着自己的工作内存(高速缓存)。
在当前的java内存模型下,线程可以把变量保存在本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。
volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值;当成员变量发生变化时,强迫线程将变化值回写到共享内存。由于使用 volatile 屏蔽掉了 vm 中必要的代码优化,所以在效率上会低效一些。
二、cas无锁算法
cas(comapre and swap):cas是一项乐观锁技术,其核心思想为冲突检测和数据更新。java对于cas支持是利用sun.misc.unsafe这个类通过jni调用cpu底层指令实现unsafe.compareandswapint操作。其主要实现思路为:将内存值v与旧的预期值e,如果相同则更新为u,不相同则由上层系统循环获取实际值后,再次调用此cas算法。最主要的应用就在原子变量类的具体实现中。
|
将内存值v与旧的预期值e,如果相同则更新为u,不相同,返回内存值给用户。
|
但cas存在问题:aba问题/循环时间长开销大/只能保证一个共享变量的原子操作 aba:如栈结构(a-b),线程t1要移除a,b变栈顶,,线程t2将结构变为a-c-d,,此时t1进行cas冲突检测发现a没变,但实际上整个栈结构变了,此时进行操作会覆盖掉c-d,解决办法是使用原子类atomicstampedreference来保证整个栈的一致性。
对于线程冲突较轻,使用cas能够避免加锁和释放锁的操作,消耗cpu资源。 对于线程冲突较重,cas容易产生自旋,即不停比较然后失败重试,浪费cpu。 |
三、aqs及其他同步工具
abstractqueuedsynchronizer抽象队列同步器,定义了多线程访问资源的同步框架。用于构建锁和其他同步组件countdownlatch等的基本框架。使用一个volatile(代表共享资源)来维护状态,通过内置的fifo队列来完成资源获取线程的排队工作。如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制aqs是用clh队列(带头节点的双向非循环链表)锁实现的,即将暂时获取不到锁的线程加入到队列中。
aqs两类应用场景:exclusive(资源独占,只有一个线程能执行,如reentrantlock)和share(资源共享,多个线程可同时执行,如semaphore/countdownlatch)
1.countdownlatch: 倒计时器,同步工具类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。在countdownlatch 上定义两种操作:countdown.countdown表示该线程工作结束(计数器减一)、countdown.await当前线程阻塞,等待其他工作线程结束(计数器为0)
2.cyclicbarrier:线程屏障,同步工具类,允许一组线程相互之间等待,达到一个共同点,再继续执行。能够重置并多次使用,并且能够获取阻塞线程数量;不会阻塞主线程。
3.semaphore:信号量,用于控制线程的并发数量。在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取许可(许可减1),要么一直等下去,直到有线程释放许可,或超时。release(释放)实际上会将许可的值加1,然后唤醒等待的线程。
使用seamphore(2),创建了多少线程(5),实际就会有多少线程执行(5),只是可同时执行的线程数量会受到限制(2)。但使用线程池(2,5),不管你创建多少线程实际可执行的线程数是一定的(2)。
4.exchanger:交换者,实现线程间的相互数据交换或通信。提供一个同步点,当两个线程都达到该同步点时,则进行交换数据,可以多个进行随机交换,但必须为偶数个。无锁,通过循环 cas 来实现线程安全。eg:string data2 = (string) exchanger.exchange(data1)。
四、非阻塞数据结构
同步集合:vector、hashtable,同步集合包装类/collections.synchronizedmap()和collections.synchronizedlist()---同步集合会对整个may或list加锁。
concurrenthashmap:hashmap的并发级别,通过继承自reentrantlock的segment来对hahs表进行分段锁,提高了并发效率
concurrentqueue也是通过同样的方式来提高并发性能的,子类concurrentlinkedqueue。例子:多线程卖票。
copyonwritearraylist:写时复制容器,复制该容器进行写操作,将当前容器进行copy,复制出一个新的容器,然后向新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对copyonwrite容器进行并发的读,而不需要加锁,因为在当前读的容器中不会添加任何元素。所以copyonwrite容器是一种读写分离的思想,读和写对应不同的容器。
blockingqueue:阻塞队列,并发容器,没有元素时-读取会堵塞,元素满时-加元素会阻塞。适合消费者生产者模式,其中executorcompletionservice就是将linkedblockingqueue和executor结合管理线程返回结果。
arrayblockingqueue |
有界队列,使用一个reentrantlock 锁。 |
linkedblockingqueue |
*队列,内部使用reentrantlock实现插入锁(putlock)和取出锁(takelock),fixedthreadpool用的这个。 |
delayqueue |
其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。但是要注意的是,不能将null元素放置到这种队列中.eg:处理超时的客户端链接、超时的缓存对象 |
上一篇: JavaScript switch语句
下一篇: kafka入门学习笔记(二)