java并发编程实战6-基础构建模块
1、同步容器类
常见的容器:Vector、HashTable、Collections.synchronizedXXX封装类等
都是线程安全的,但在某些情况下可能需要客户端加锁来保护复核操作(如A线程获取最后一个值,B线程删除最后一个元素,这两个操作可能交替执行,可能使得抛出ArrayIndexOutOfBoundsException),使结果符合期望。
迭代器表现形为使“及时失败”(fail-fast),即当迭代过程中容器被其他线程修改时,就抛出ConcurrentModificationException异常,容器内部采用了计数器与容器关联,迭代期间计数器被修改,那么next就会抛出异常。如果不希望迭代过程中抛出异常,可用的方式加锁或者克隆容器。加锁可能存在线程饥饿或死锁等情况,所以大多数情况下并不推荐。克隆容器就是在线程封闭的副本上迭代,不过在容器克隆时会有显著的性能开销,能否采用就要看容器大小、在元素上执行的操作等多个因素
容器在做toString、contianAll、removeAll、hashCode、equal时等操作时也会间接的执行迭代操作,所以也有可能抛出ConcurrentModificationException异常
2、 并发容器
常见容器:currentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue(FIFO)、BlockingQueue等
currentHashMap:使用分段锁(lock
striping)来实现最大程度的数据共享,执行读取和写入的线程可以并发的访问Map。迭代器具有弱一致性并非及时失败,一些需要计算整个Map的操作如size是一个估计值并非精确值。实现了“没有则添加”、“相等移除”’、“相等替换”等复合原子操作
CopyOnWriteArrayList:发布了事实不可变对象,访问不需要同步。每次修改时,都会创建并重新发布一个新的容器,实现可变性。适用于迭代操作远远多于修改操作。
3、阻塞队列
阻塞队列提供了可阻塞的put和take方法,并支持定时的offer和poll方法。已满的队列put阻塞至存在空余空间,为空的对了take阻塞至队列不为空
FIFO(先进先出):ConcurrentLinkedQueue、LinkedBlokingQueue
优先级队列:PriorityBlockingQueue
串行线程封闭:利用阻塞队列的内部同步机制,从而安全的将对象从生产者发布到消费者的线程 双端队列与工作密取
4、 阻塞方法与中断方法
线程阻塞状态:BLOCKED、WAITING、TIMED_WAITING
中断方法:Thread提供了interrupt方法,用于中断或查询是否中断
5、 同步工具类
闭锁:可以用来确保某些活动直到其他活动都完成才继续执行;CountDownLatch是一种灵活的闭锁实现,闭锁状态包含了一个计数器,countDown方法递减计数器,await方法等待计数器达到零。
FutureTest:get方法可以计算并获取任务的执行状态,等待运行、正在运行、执行完成3个状态,是由Callbale方式实现,也可以实现闭锁
信号量(Counting
Semaphore):用来控制访问特定资源的操作数量;可以用来实现某种资源池或容器边界;acquire方法获得许可(已满阻塞),release方法释放许可
栅栏(Barrier):所有线程必须到达栅栏位置才能继续执行或者两方栅栏交换数据
6、 构建高效且可伸缩的结果缓存
public interface Computable<A, V> {
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String, BigInteger> {
@Override
public BigInteger compute(String arg) throws InterruptedException {
return new BigInteger(arg);
}
}
public class Memoizer<A, V> implements Computable<A, V> {
private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
}catch (InterruptedException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
本文地址:https://blog.csdn.net/qiuye_liang/article/details/111937168