《Java并发编程实战》 阅读笔记 5. 基础构建模块
Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类(Synchronizer)。本章将介绍其中一些最有用的并发构建模块,以及在使用这些模块来构造并发应用时的一些常用模式。
同步容器类包括Vector和Hashtable等,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个共有方法都进行同步,使得每次只有一个线程能够访问容器的状态。
容器的toString、 hashCode、equals等方法会间接地执行迭代操作。
同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量严重降低。
在Java5.0中增加了ConcurrentHashMap,用来替代同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。
在新的Concurrent接口中增加了对一些常见复合操作的支持,例如“若没有则添加”、替换以及有条件删除等等。
通过并发容器代替同步容器,可以极大的提高伸缩性并降低风险。
Java5.0增加了两种新的容器类型:Queue和BlockingQueue。 Queue用来临时保存一组等待处理的元素。它提供了几种实现
ConcurrentLinkedQueue :先进先出队列。
PriorityQueue:(非并发)优先队列。
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,直到队列中出现可用的空间。
在“生产者-消费者”这两种设计模式中,阻塞队列是非常有用的。
Java6 也引入了ConcurrentSkipListMap和ConcurrentSkipListSet,分别作为同步的SortedMap和SortedSet的并发替代品(例如用synchronizedMap包装的TreeMap或TreeSet)。
ConcurrentHashMap
与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度上的共享,这种机制称为分段锁(Lock Striping)。在这种机制中,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发的修改Map。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。
它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。
CopyOnWriteArrayList
CopyOnWriteArrayList用于替代同步List,在迭代期间不需要对容器进行加锁或复制。(类似的CopyOnWriteArraySet的作用是替代同步Set)。
“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变对象,那么再访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。多个线程可以同时对这个容器进行迭代(容器的迭代器保留一个指向底层基础数组的引用,这数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需要确保数组内容的可见性),它们提供的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。
显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是容器规模较大时,仅当迭代操作远远多于修改操作时,才应当使用“写入时复制”容器。(事件通知系统:在分发通知时需要跌打已注册监听链表,并调用每一个监听器,在大多数情况***册和注销时间监听器的操作远少于接收事件通知的操作。)
队列可以是有界的也可以是*的,*队列永远都不会满,因此*队列上的put方法也永远不会阻塞。
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。
同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。
闭锁
CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待线程的中断,或者等待超时。
import java.util.concurrent.CountDownLatch;
public class TestCountDownLatch {
private static int number = 1000000;
/**
* 测试n个线程并发执行某个线程所需要的时间。
* @param nThreads
* @param task
* @return
* @throws InterruptedException
*/
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
public static void main(String[] args) throws InterruptedException {
Runnable task = new Runnable() {
@Override
public void run() {
while(number > 0) {
System.out.println(Thread.currentThread().getName() + "---------"+ number--);
}
}
};
TestCountDownLatch test = new TestCountDownLatch();
long time = test.timeTasks(8, task); // 4 4898865s 1 3791084s 6236428991s 5919032207s
System.out.println(time + "s");
}
}
FutureTask也可以用做闭锁。FutrureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态,等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会永远停止在这个状态上。
Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传承过程能实现结果的安全发布。
FutureTask在Executor框架中表示异步任务,此外还可以表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。
信号量
Semaphore 中管理着一组许可,许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可) ,并在使用后以释放许可,如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断,或者操作超时)。release方法将返回一个许可信号量。
计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用作互斥体(mutex),并且具备不可重入的加锁语义:谁拥有了这个许可,谁有拥有了互斥锁。
Semaphore可以用于实现资源池,例如数据库连接池。如果将Semaphore的计数量初始化为池的大小,并在从池中获取了一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用realease释放许可,那么acquire将一直阻塞直到资源池不为空。
(在构造阻塞对象池时,一种更简单的方法是使用BlockingQueue来保存池的资源。)
同样,也可以使用Semaphore将任何一种容器变成有界阻塞容器。
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<>(bound));
this.sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}
public boolean remove(T o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}
栅栏(Barrier)
CyclicBarrier可以使一定数量的参与方反复的在栅栏位置汇聚,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功的通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。
上一篇: 5.标签管理