Java 并发关键字大练兵—一文读懂各个关键字
本文介绍了Threadlocal、volatile、condition、Semaphore、CountDownLatch、unsafe 等关键字
目录如下:
- Threadlocal 本地线程
- volatile
- condition
- CountDownLatch 闩锁
- CyclicBarrier 篱栅
- Semaphore 信号灯
- unsafe 魔法类
- StampedLock 新读写锁
1. Threadlocal
从名字我们就可以看到ThreadLocal叫做本地线程,意思是ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的。ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。
Java就是通过ThreadLocal
来实现线程本地存储的。
使用场景:
- 在进行对象跨层传递的时候,使用ThreadLocal可以避免多次传递,打破层次间的约束。
- 线程间数据隔离
- 进行事务操作,用于存储线程事务信息。
- 数据库连接,Session会话管理。**
1.1 ThreadLocal 结构分析
ThreadLocalMap
里面有个Entry
数组,只有数组没有像HashMap
那样有链表,因此当hash冲突的之后,ThreadLocalMap
是采用线性探测的方式解决hash冲突。
线性探测,就是先根据初始key
的hashcode
值确定元素在table
数组中的位置,如果这个位置上已经有其他key
值的元素被占用,则利用固定的算法寻找一定步长的下个位置,依次直至找到能够存放的位置。在ThreadLocalMap
步长是1。
线性探测 是通过 AtomicInteger 的原子性方法 getAndAdd 获取个位置数组中的位置,如果该位置已被占用则通过 nextIndex 方法获取下一个位置,循环直到有空位置为止。
结构如下
public class ThreadLocal<T> {
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
static class ThreadLocalMap {
private Entry[] table;
private int size = 0;
//继承弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
//对k加上弱引用WeakReference
super(k);
value = v;
}
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
//循环探测下个hashcode 的位置
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
}
}
1.2 ThreadLocalMap
ThreadLocalMap其实就是ThreadLocal的一个静态内部类(多线程共享),里面定义了一个Entry[]数组来保存数据,而且还是继承的弱引用, key为弱引用。在Entry内部使用ThreadLocal作为key,使用我们设置的value作为value。
- 每个Thread维护着一个ThreadLocalMap的引用
- ThreadLocalMap是ThreadLocal的内部类,用Entry来进行存储
- ThreadLocal创建的副本是存储在自己的threadLocals中的,也就是自己的ThreadLocalMap。
- ThreadLocalMap的键值为ThreadLocal对象,而且可以有多个threadLocal变量,因此保存在map中
- 在进行get之前,必须先set,否则会报空指针异常,当然也可以初始化一个,但是必须重写initialValue()方法。
- ThreadLocal本身并不存储值,它只是作为一个key来让线程从ThreadLocalMap获取value。
1.3 内存泄漏
内存泄漏问题:
上面这张图详细的揭示了ThreadLocal和Thread以及ThreadLocalMap三者的关系。
- 1、Thread中有一个map,就是ThreadLocalMap
- 2、ThreadLocalMap的key是ThreadLocal,值是我们自己设定的。
- 3、ThreadLocal是一个弱引用weakReference,当为null时,会被当成垃圾回收
- 4、重点来了,突然我们ThreadLocal是null了,也就是要被垃圾回收器回收了,但是此时我们的ThreadLocalMap生命周期和Thread的一样,它不会回收,这时候就出现了一个现象。那就是ThreadLocalMap的key没了,但是value还在,这就造成了内存泄漏。
解决办法:使用完ThreadLocal后,执行remove操作,避免出现内存溢出情况。
参考:
https://mp.weixin.qq.com/s/Gc1YPt_DPMNKbbE_I0jmSA
https://baijiahao.baidu.com/s?id=1653790035315010634&wfr=spider&for=pc
2. volatile
- 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。(实现可见性)
- 禁止进行指令重排序。(实现有序性)
- volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。
读理解
当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
写理解
当写一个volatile变量时,JMM会把该线程对应的本地中的共享变量值刷新到主内存。
将当前处理器缓存行的数据写回到系统内存。这个写回内存的操作会告知在其他CPU你们拿到的变量是无效的下一次使用时候要重新共享内存拿。
使用 volatile 必须具备的条件
- 对变量的写操作不依赖于当前值。
该变量没有包含在具有其他变量的不变式中。
- 只有在状态真正独立于程序内其他内容时才能使用 volatile。
2.1 volatile 禁止指令重排
volatile是通过内存屏障来来禁止指令重排的。
内存屏障(Memory Barrier是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。下表描述了和volatile有关的指令重排禁止行为:
内存屏障就是基于4个汇编级别的关键字来禁止指令重排的,其中volatile的规则如下:
-
当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
-
当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
-
当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。
2.1.1 内存屏障(Memory Barrier)
StoreStore屏障:禁止上面的普通写和下面的volatile写重排序;
StoreLoad屏障:防止上面的volatile写与下面可能有的volatile读/写重排序
LoadLoad屏障:禁止下面所有的普通读操作和上面的volatile读重排序
LoadStore屏障:禁止下面所有的普通写操作和上面的volatile读重排序
CPU中,每个CPU又有多级缓存,一般分为L1,L2,L3,因为这些缓存的出现,提高了数据访问性能,避免每次都向内存索取,但是弊端也很明显,不能实时的和内存发生信息交换,分在不同CPU执行的不同线程对同一个变量的缓存值不同。
- 硬件层的内存屏障分为两种:
Load Barrier
和Store Barrier
即读屏障和写屏障。【内存屏障是硬件层的】
2.1.2 为什么需要内存屏障
由于现代操作系统都是多处理器操作系统,每个处理器都会有自己的缓存,可能存再不同处理器缓存不一致的问题,而且由于操作系统可能存在指令重排序,导致读取到错误的数据,因此,操作系统提供了一些内存屏障以解决这种问题.
简单来说:
1.在不同CPU执行的不同线程对同一个变量的缓存值不同。为了解决这个问题,用volatile可以解决上面的问题,不同硬件对内存屏障的实现方式不一样。java屏蔽掉这些差异,通过jvm生成内存屏障的指令。
2.对于读屏障:在指令前插入读屏障,可以让高速缓存中的数据失效,强制从主内存取。
2.1.3 内存屏障的作用
cpu执行指令可能是无序的,它有两个比较重要的作用
1.阻止屏障两侧指令重排序
2.强制把写缓冲区/高速缓存中的脏数据等写回主内存,让缓存中相应的数据失效。
2.2有序性
编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎是不可能的,为此,JMM采取了保守策略:
- 在每个volatile写操作的前面插入一个StoreStore屏障;写完对写可见
- 对于这样的语句Store1;
StoreStore
; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
- 对于这样的语句Store1;
- 在每个volatile写操作后面插入一个StoreLoad屏障;写完对读可见
- 对于这样的语句Store1;
StoreLoad
; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。
- 对于这样的语句Store1;
- 在每个volatile读操作的后面插入一个LoadLoad屏障;读完对读可见
- 对于这样的语句Load1;
LoadLoad
; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
- 对于这样的语句Load1;
- 在每个volatile读操作之后再插入一个LoadStore屏障。读完对写可见
- 对于这样的语句Load1;
LoadStore
; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
- 对于这样的语句Load1;
volatile通过在volatile变量的操作前后插入内存屏障的方式,来禁止指令重排,进而保证多线程情况下对共享变量的有序性。
2.3 volatile 可见性
volatile对于可见性的实现,内存屏障也起着至关重要的作用。因为内存屏障相当于一个数据同步点,他要保证在这个同步点之后的读写操作必须在这个点之前的读写操作都执行完之后才可以执行。并且在遇到内存屏障的时候,缓存数据会和主存进行同步,或者把缓存数据写入主存、或者从主存把数据读取到缓存。
操作系统中的缓存和JVM中线程的本地内存并不是一回事,通常我们可以认为:MESI可以解决缓存层面的可见性问题。使用volatile关键字,可以解决JVM层面的可见性问题。
缓存可见性问题的延伸:由于传统的MESI协议的执行成本比较大。所以CPU通过Store Buffer和Invalidate Queue组件来解决,但是由于这两个组件的引入,也导致缓存和主存之间的通信并不是实时的。也就是说,缓存一致性模型只能保证缓存变更可以保证其他缓存也跟着改变,但是不能保证立刻、马上执行。
在计算机内存模型中,也是使用内存屏障来解决缓存的可见性问题的(再次强调:缓存可见性和并发编程中的可见性可以互相类比,但是他们并不是一回事儿)。写内存屏障(Store Memory Barrier)可以促使处理器将当前store buffer(存储缓存)的值写回主存。读内存屏障(Load Memory Barrier)可以促使处理器处理invalidate queue(失效队列)。进而避免由于Store Buffer和Invalidate Queue的非实时性带来的问题。
内存屏障也是保证可见性的重要手段,操作系统通过内存屏障保证缓存间的可见性,JVM通过给volatile变量加入内存屏障保证线程之间的可见性。
2.4 volatile 与 synchronized 的比较
- volatile是变量修饰符,其修饰的变量具有可见性。
- volatile主要用在多个线程感知实例变量被更改了场合,从而使得各个线程获得最新的值。它强制线程每次从主内存中取到变量,而不是从线程的私有内存中读取变量,从而保证了数据的可见性。
- ①volatile轻量级,只能修饰变量。synchronized重量级,还可修饰方法
- ②volatile只能保证数据的可见性,不能用来同步,因为多个线程并发访问volatile修饰的变量不会阻塞。
- synchronized不仅保证可见性,而且还保证原子性,因为,只有获得了锁的线程才能进入临界区,从而保证临界区中的所有语句都全部执行。多个线程争抢synchronized锁对象时,会出现阻塞。
参考:https://www.hollischuang.com/archives/2673?spm=a2c6h.12873639.0.0.1c786ca5UcOGO5
3. Condition
在 lock 接口和 AbstractQueuedSynchronizer 中的ConditionObject类都有用到 condition 接口
interface Lock {
Condition newCondition();
}
在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步方式了。配合Object的wait()、notify()系列方法可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。Object和Condition接口的一些对比。
condition对象是依赖于lock对象的,意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)。但是需要注意在调用方法前获取锁。
一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。
3.1 condition常用方法
condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
- await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
- await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
- awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
- awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
- awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回false。
- signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
- signal()All :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
3.2 condition原理:
Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。等待队列的基本结构如下所示。
等待分为首节点和尾节点。当一个线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。新增节点后将尾部节点换为新增的节点。节点引用更新本来就是在获取锁以后的操作,所以不需要CAS保证,也是线程安全的操作。
等待
当线程调用了await方法以后。线程就作为队列中的一个节点被加入到等待队列中去了。同时会释放锁的占用。当从await方法返回的时候。一定会获取condition相关联的锁。当等待队列中的节点被唤醒的时候,则唤醒节点的线程开始尝试获取同步状态(锁)。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException异常信息。
通知
调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到同步队列中,即可以去竞争获取锁,。当前线程加入到等待队列中如图所示:
在调用signal()方法之前必须先判断是否获取到了锁。接着获取等待队列的首节点,将其移动到同步队列并且利用LockSupport唤醒节点中的线程。节点从等待队列移动到同步队列如下图所示:
被唤醒的线程将从await方法中的while循环中退出。随后加入到同步状态的竞争当中去。成功获取到竞争的线程则会返回到await方法之前的状态。
3.3condition 总结
调用await方法后,将当前线程加入Condition等待队列中。当前线程释放锁。否则别的线程就无法拿到锁而发生死锁。自旋(while)挂起,不断检测节点是否在同步队列中了,如果是则尝试获取锁,否则挂起。当线程被signal方法唤醒,被唤醒的线程将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。
4. CountDownLatch(闩锁)
CountDownLatch适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。
初始化一个CountDownLatch实例传参3,因为我们有3个子线程,每次子线程执行完毕之后调用countDown()方法给计数器-1,主线程调用await()方法后会被阻塞,直到最后计数器变为0,await()方法返回,执行完毕。他和join()方法的区别就是join会阻塞子线程直到运行结束,而CountDownLatch可以在任何时候让await()返回,而且用ExecutorService没法用join了,相比起来,CountDownLatch更灵活。
CountDownLatch基于AQS实现,volatile变量state维持倒数状态,多线程共享变量可见。
- CountDownLatch通过构造函数初始化传入参数实际为AQS的state变量赋值,维持计数器倒数状态
- 当主线程调用await()方法时,当前线程会被阻塞,当state不为0时进入AQS阻塞队列等待。
- 其他线程调用countDown()时,state值原子性递减,当state值为0的时候,唤醒所有调用await()方法阻塞的线程
4.1 CountDownLatch与thread.join()的区别
join,在当前线程中,如果调用某个thread的join方法,那么当前线程就会被阻塞,直到thread线程执行完毕,当前线程才能继续执行。join的原理是,不断的检查thread是否存活,如果存活,那么让当前线程一直wait,直到thread线程终止,线程的this.notifyAll 就会被调用。
CountDownLatch中我们主要用到两个方法一个是await()方法,调用这个方法的线程会被阻塞,另外一个是countDown()方法,调用这个方法会使计数器减一,当计数器的值为0时,因调用await()方法被阻塞的线程会被唤醒,继续执行。
调用join方法需要等待thread执行完毕才能继续向下执行,而CountDownLatch只需要检查计数器的值为零就可以继续向下执行,相比之下,CountDownLatch更加灵活一些,可以实现一些更加复杂的业务场景。
5. CyclicBarrier(篱栅)
CyclicBarrier叫做回环屏障,它的作用是让一组线程全部达到一个状态之后再全部同时执行,而且他有一个特点就是所有线程执行完毕之后是可以重用的。
CountDownLatch非常相似,初始化传入3个线程和一个任务,线程调用await()之后进入阻塞,计数器-1,当计数器为0时,就去执行CyclicBarrier中构造函数的任务,当任务执行完毕后,唤醒所有阻塞中的线程。这验证了CyclicBarrier让一组线程全部达到一个状态之后再全部同时执行的效果。
CyclicBarrier 是通过 ReentrantLock加锁,控制count属性的加减计数的。也是基于AQS
可重用
每个子线程调用await()计数器减为0之后才开始继续一起往下执行,此时count会恢复到最初计数,当再次调用await()时,就得再次等到计数器为0之后就又一起往下执行,这就是可重用。
CyclicBarrier还是基于AQS实现的,内部维护parties记录总线程数,count用于计数,最开始count=parties,调用await()之后count原子递减,当count为0之后,再次将parties赋值给count,这就是复用的原理。
- 当子线程调用await()方法时,获取独占锁,同时对count递减,进入阻塞队列,然后释放锁
- 当第一个线程被阻塞同时释放锁之后,其他子线程竞争获取锁,操作同1
- 直到最后count为0,执行CyclicBarrier构造函数中的任务,执行完毕之后子线程继续向下执行
CyclicBarrier 小例子实现
CyclicBarrier可重用小例子实现
6. Semaphore(信号灯)
Semaphore叫做信号量,和前面两个不同的是,他的计数器是递增的。
稍微和前两个有点区别,构造函数传入的初始值为0,当子线程调用release()方法时,计数器递增,主线程acquire()传参为3 则说明主线程一直阻塞,直到计数器为3才会返回。
Semaphore还还还是基于AQS实现的,同时获取信号量有公平和非公平两种策略
- 主线程调用acquire()方法时,用当前信号量值-需要获取的值,如果小于0,则进入同步阻塞队列,大于0则通过CAS设置当前信号量为剩余值,同时返回剩余值
- 子线程调用release()给当前信号量值计数器+1(增加的值数量由传参决定),同时不停的尝试因为调用acquire()进入阻塞的线程
7. unsafe
AtomicInteger的自增函数incrementAndGet()的源码时,发现自增函数底层调用的是unsafe.getAndAddInt()。但是由于JDK本身只有Unsafe.class,只通过class文件中的参数名,并不能很好的了解方法的作用,所以我们通过OpenJDK 8 来查看Unsafe的源码:
// ------------------------- JDK 8 -------------------------
// AtomicInteger 自增方法
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// Unsafe.class
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
// ------------------------- OpenJDK 8 -------------------------
// Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
8. StampedLock
在Java 8中引入了一种锁的新机制——StampedLock,它可以看成是读写锁的一个改进版本。StampedLock提供了一种乐观读锁的实现,这种乐观读锁类似于无锁的操作,完全不会阻塞写线程获取写锁,从而**缓解读多写少时写线程“饥饿”**现象。由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题,所以当使用StampedLock的乐观读锁时,需要遵从如下图用例中使用的模式来确保数据的一致性。
小结
CountDownLatch通过计数器提供了比join更灵活的多线程控制方式,join会阻塞子线程直到运行结束,而CountDownLatch可以在任何时候让await()返回。CyclicBarrier也可以达到CountDownLatch的效果,而且有可复用的特点,Semaphore则是采用信号量递增的方式,开始的时候并不需要关注需要同步的线程个数,并且提供获取信号的公平和非公平策略。
参考:
unsafe java 魔法类
说说CountDownLatch,CyclicBarrier,Semaphore的原理?
本文地址:https://blog.csdn.net/u012373815/article/details/110927237
上一篇: 单条容量可达512GB DDR5最大优势是容量而非速度
下一篇: 终于等到今天 健康码 要大一统了!