并发工具类(二)
1.写在前面
一个长假终于过完了,休息了8天,什么事也没有做,突然想想我是一个不称职的笔者,这个长假竟然一篇博客没有写。体重也重了不少,感觉整个人都是灰暗。好好调整状态,继续起航。上篇博客大概讲了下并发工具类的一部分。今天我们来讲下剩下的部分。
2.StampedLock
2.1StampedLock支持的三种锁模式
ReadWriteLock支持两种模式:一种是读锁,一种是写锁
StampedLock支持三种模式:写锁、悲观读锁、乐观锁。其中,写锁、悲观读锁的语义和ReadWriteLock的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock里的写锁和悲观读锁加锁成功之后,都会返回一个stamp;然后解锁的时候,需要传入这个stamp。
final StampedLock sl = new StampedLock();
// 获取释放悲观读锁示意代码
long stamp = sl.readLock();
try {
// 省略业务相关代码
} finally {
sl.unlockRead(stamp);
}
// 获取释放写锁示意代码
long stamp = sl.writeLock();
try {
// 省略业务相关代码
} finally {
sl.unlockWrite(stamp);
}
StampedLock的性能之所以比ReadWriteLock还要好,其关键是StampedLock支持乐观读的方式。ReadWriteLock支持多个线程同时读,但是当多个线程同时读的时候,所有的写都会被阻塞;而StampedLock提供的乐观锁,是允许一个线程获取写锁的,也就是说不是所有的写都被阻塞。tryOptimisticRead() 就是我们前面提到的乐观读。
class Point {
private int x, y;
final StampedLock sl = new StampedLock();
// 计算到原点的距离
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量,
// 读的过程数据可能被修改
int curX = x, curY = y;
// 判断执行读操作期间,
// 是否存在写操作,如果存在,
// 则 sl.validate 返回 false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(curX * curX + curY * curY);
}
}
如果执行乐观读操作的期间,存在写操作,会把乐观读升级成悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写的操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。
2.2进一步理解乐观读
我们先来看看数据库中乐观锁,假设数据库中有一个生产订单的表product_doc,这个时候我们在这个表里增加一个数值型的版本号字段version,每次更新product_doc这个表的时候,都将version字段加1.生产订单的UI在展示的时候,需要查询数据库,此时将这个version字段和其他业务字段一起返回给生产订单UI。假设用户查询的生产订单的id=777,那么SQL语句类似下面这样:
select id,... ,version
from product_doc
where id=777
用户在生产订单 UI 执行保存操作的时候,后台利用下面的 SQL 语句更新生产订单,此处我们假设该条生产订单的 version=9。
update product_doc
set version=version+1,...
where id=777 and version=9
如果这条 SQL 语句执行成功并且返回的条数等于 1,那么说明从生产订单 UI 执行查询操作到执行保存操作期间,没有其他人修改过这条数据。因为如果这期间其他人修改过这条数据,那么版本号字段一定会大于 9。
你会发现数据库里的乐观锁,查询的时候需要把 version 字段查出来,更新的时候要利用version 字段做验证。这个 version 字段就类似于 StampedLock 里面的 stamp。这样对比着看,相信你会更容易理解 StampedLock 里乐观读的用法。
2.3StampedLock使用注意事项
StampedLock的功能仅仅是ReadWriteLock的子集。StampedLock不支持重入。StampedLock的悲观读锁、写锁都不支持条件变量。如果线程阻塞在 StampedLock 的 readLock() 或者writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。 使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()。
2.4总结
StampedLock读模板
final StampedLock sl = new StampedLock();
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入方法局部变量
// ......
// 校验 stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
//.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 使用方法局部变量执行业务操作
// ......
StampedLock写模板
long stamp = sl.writeLock();
try {
// 写共享变量
// ......
} finally {
sl.unlockWrite(stamp);
}
3.CountDownLatch与CyclicBarrier
3.1从对账开始谈起
具体的业务:用户通过在线商城下单,会生成电子单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。大概的流程就是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。具体的如下图:
再对上面的流程图进行抽象成代码,具体的如下:
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
3.2利用并行优化对账系统
由于查询未对账订单和查询派送单两个操作之间没有任何依赖,所以这个操作可以直接并行的操作。我们创建了两个线程 T1和 T2,并行执行查询未对账订单 getPOrders() 和查询派送单 getDOrders() 这两个操作。在主线程中执行对账操作 check() 和差异写入 save() 两个操作。不过需要注意的是:主线程需要等待线程 T1 和 T2 执行完才能执行 check() 和 save() 这两个操作,为此我们通过调用 T1.join() 和 T2.join() 来实现等待,当 T1 和 T2 线程退出时,调用 T1.join() 和T2.join() 的主线程就会从阻塞态被唤醒,从而执行之后的 check() 和 save()。具体的代码如下:
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
3.3使用CountDownLatch实现线程等待
上面的代码你会发现每次都要创建线程,读者都知道创建线程是一个耗时的操作,我们尽量避免多次创建线程,于是这儿你就想到了用线程池的办法。可是一旦使用线程池的办法,join就会失效,因为在线程池的方案里,线程根本就不会退出,所以join的方法已经失效。这个时候我们该怎么办呢?这个时候我们可以使用jdk自带的工具类CountDownLatch,在while 循环里面,我们首先创建了一个 CountDownLatch,计数器的初始值等于 2,之后在pos = getPOrders();和dos =getDOrders();两条语句的后面对计数器执行减 1操作,这个对计数器减 1 的操作是通过调用 latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() 来实现对计数器等于 0 的等待。
// 创建 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为 2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
3.4进一步优化性能
两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者 - 消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者 - 消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。 订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
最直接的想法就是:一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。
3.5用 CyclicBarrier 实现线程同步
上面的方案的难点有两个:一个是线程T1和T2要做到步调一致,另一个是要能够通知到线程T3
我们可以使用CyclicBarrier,在下面的代码中,我们首先创建了一个计数器初始值为 2 的CyclicBarrier,你需要注意的是创建 CyclicBarrier 的时候,我们还传入了一个回调函数,当计数器减到 0 的时候,会调用这个回调函数。
线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询派送单,当查出一条时,也调用 barrier.await()来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。非常值得一提的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值。这个功能用起来实在是太方便了。
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
3.6总结
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
4.并发容器
4.1同步容器及其注意事项
Java中的容器主要可以分为四大类,分别是List、Map、Set和Queue,但并不是所有的Java容器都是线程安全的。
容器领域中一个容易被忽视的坑是用迭代器遍历容器。我们提到的这些经过包装后线程安全容器,都是基于synchronized这个同步关键字实现的,所以也被称为同步容器。
Java 提供的同步容器还有 Vector、Stack 和 Hashtable,这三个容器不是基于包装类实现的,但同样是基于 synchronized 实现的,对这三个容器的遍历,同样要加锁保证互斥。
4.2并发容器及其注意事项
Java在1.5版本之前所谓的线程安全的容器,主要指的是同步容器。不过同步容器有个最大的问题,那就是性能差,所有方法都用 synchronized 来保证互斥,串行度太高了。因此 Java 在 1.5 及之后版本提供了性能更高的容器,我们一般称为并发容器。
4.21List
CopyOnWriteArrayList ,CopyOnWrite,顾名思义就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。实现原理:CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。
如果在遍历 array 的同时,还有一个写操作,例如增加元素,CopyOnWriteArrayList 是如何处理的呢?CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。通过下图你可以看到,读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。
注意:
- 一个是应用场景,CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。
- 另一个需要注意的是,CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。
4.22Map
ConcurrentHashMap 的 key 是无序的,而ConcurrentSkipListMap 的 key 是有序的。 使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。
集合类 | Key | Value | 是否线程安全 |
---|---|---|---|
HashMap | 允许为null | 允许为null | 否 |
TreeMap | 不允许为null | 允许为null | 否 |
HashTable | 不允许为null | 不允许为null | 是 |
ConcurrentHashMap | 不允许为null | 不允许为null | 是 |
ConcurrentSkipListMap | 不允许为null | 不允许为null | 是 |
4.23Set
Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景可以参考前面讲述的 CopyOnWriteArrayList 和 ConcurrentSkipListMap,它们的原理都是一样的,这里就不再赘述了。
4.24Queue
从两个维度来分类:
- 一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。
- 另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。
- Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。
细分:
- 单端阻塞队列:其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和
DelayQueue。内部一般会持有一个队列,这个队列可以是数组(其实现是ArrayBlockingQueue)也可以是链表(其实现是 LinkedBlockingQueue);甚至还可以不持有队列(其实现是 SynchronousQueue),此时生产者线程的入队操作必须等待消费者线程的出队操作。而 LinkedTransferQueue 融合 LinkedBlockingQueue 和SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。 - 双端阻塞队列:其实现是 LinkedBlockingDeque。
- 单端非阻塞队列:其实现是 ConcurrentLinkedQueue。
- 双端非阻塞队列:其实现是 ConcurrentLinkedDeque。
注意:只有 ArrayBlockingQueue 和LinkedBlockingQueue 是支持有界的,所以在使用其他*队列时,一定要充分考虑是否存在导致 OOM 的隐患。
5.写在最后
路漫漫其修远兮,吾将上下而求索。国庆过后就是不想上班,同时也很迷茫,可能每个25岁之前的人都是这样的。生活中总有一些东西会打乱你的生活的节奏,你不够坚定,所以你可能会捡了西瓜丢了玉米。漫漫人生路。你也会为了孤独而去去寻找一些刺激吗?总之,学习所带来的回馈非常低。多巴胺的刺激最少了。但是还是要坚持下去。