Java并发编程实战(学习笔记十一 第十二章 并发程序的测试)
并发测试大致分为两类,即安全性测试与活跃性测试。
安全性定义为“不发生任何错误的行为”,而活跃性定义为“某个良好的行为终究会发生”。
在进行安全性测试时,通常会采用不变性条件的形式,即判断某个类是否与其规范保持一致。
与活跃性测试的是性能测试,性能可以通过多个方面来衡量,包括:
①吞吐量(Throughput):指一组并发任务中已完成任务所占的比例。
②响应性(Responsiveness):指请求从出来到完成之间的时间(也称为延迟(latency))
③可伸缩性(Scalability):指在增加更多资源的情况下(通常指CPU),吞吐量(或缓解短缺)的提升情况。
12.1 正确性测试(Testing for Correctness)
在为某个并发类设计单元测试时,首先需要执行与测试串行类时相同的设计——找出需要检查的不变性条件和后验条件。
我们将构建一组测试用例来测试一个有界缓存,12-1其中使用Semaphore来实现缓存的有界属性和阻塞行为。
BoundedBuffer实现了一个固定长度的队列,其中定义了可阻塞的put和take方法,并通过两个计数信号量进行控制。
信号量availableItems表示可以从缓存中删除的元素个数,它的初始值为0(因为缓存的初始状态为空)
信号量availableSpace表示可插入到缓存的元素个数,它的初始值等于缓存的大小。
take操作会首先请求从availableItems中获得一个许可(Permit)。如果缓存不为空,那么这个请求会立即成功,否则请求将阻塞直到缓存不为空。在获得一个许可后,take方法将删除缓存中的下一个元素,并返回一个许可到availableSpaces信号量。
put方法的执行顺序正好相反,因此无论是从put方法还是从take方法中退出,这两个信号量的加和都等于缓存的大小。
// 12-1 基于信号量的有界缓存
@ThreadSafe
public class BoundedBuffer<E>{
//信号量availableItems表示可以从缓存中删除的元素个数,它的初始值为0(因为缓存的初始状态为空)
//信号量availableSpace表示可插入到缓存的元素个数,它的初始值等于缓存的大小。
private final Semaphore availableItems,availableSpaces;
private final E[] items; //缓存
private int putPosition=0, takePosition=0;
public BoundedBuffer(int capacity){ //capacity 容量
availableItems=new Semaphore(0);
availableSpaces=new Semaphore(capacity);
items=(E[]) new Object[capacity];
}
public boolean isEmpty(){
//返回最近的准许可获得的信号量的数值
return availableItems.availablePermits()==0;
}
public boolean isFull(){
return availableSpaces.availablePermits()==0;
}
public void put(E x) throws InterruptedException{
availableSpaces.acquire(); //从信号量获得一个许可,阻塞直到有空或线程被中断 ,availableSpace的信号量会变小
doInsert(x);
availableItems.release(); //释放一个许可并将其返回给信号量
}
/*
* take操作会首先请求从availableItems中获得一个许可(Permit)。所以availableItems的信号量会变小
* 如果缓存不为空,那么这个请求会立即成功,否则请求将阻塞直到缓存不为空。
* 在获得一个许可后,take方法将删除缓存中的下一个元素,并返回一个许可到availableSpaces信号量。
*/
public E take() throws InterruptedException{
availableItems.acquire(); //如果缓存不为空,那么这个请求会立即成功,否则请求将阻塞直到缓存不为空。
E item=doExtract(); //extract 获得
availableSpaces.release();
return item;
}
private synchronized void doInsert(E x){
int i=putPosition;
items[i]=x;
putPosition =(++i==items.length)?0:i; //如果++后等于队列长度,此时items[i]越界,所以返回0,否则返回i
}
private synchronized E doExtract(){
int i =takePosition;
E x=items[i];
items[i]=null; //取出后清空
takePosition=(++i==items.length)?0:i; //如果++后等于队列长度,此时items[i]越界,所以返回0,否则返回i
return x;
}
}
在实际情况中,如果需要一个有界缓存,应该直接使用ArrayBlockingQueue或LinkedBlockingQueue,而不是自己编写。
12.1.1 基本的单元测试
BoundedBuffer的最基本单元测试类似与在串行上下文中执行的测试。
首先创建一个有界缓存,然后调用它的各个方法,并验证它的不变性条件和后验条件。
不变性条件:新建立的缓存应该是空的。
另一个安全测试是,将N个元素插入到容量为N的缓存中,然后测试缓存是否已经填满(不为空)。
12-2给出了这些属性的JUnit测试方法。
// 12-2 BoundedBuffer的基本单元测试
public class BoundedBufferTest extends TestCase{ //junit.framework.TestCase
//新建立的缓存应该是空的
void testIsEmptyWhenConstructed(){
BoundedBuffer<Integer> bb=new BoundedBuffer<Integer>(10);
assertTrue(bb.isEmpty()); //assert断言,假设情况为真,如果失败则抛出AssertionFailedError
assertFalse(bb.isFull());
}
//另一个安全测试是,将N个元素插入到容量为N的缓存中,然后测试缓存是否已经填满(不为空)。
void testIsFullAfterPuts() throws InterruptedException{
BoundedBuffer<Integer> bb=new BoundedBuffer<Integer>(10);
for(int i=0;i<10;i++)
bb.put(i);
assertTrue(bb.isFull());
assertFalse(bb.isEmpty());
}
}
12.1.2 对阻塞操作的测试
在测试并发的基本属性时,需要引入多个线程。
在java.util.concurrent的一致性测试中,一定要将各种故障与特定的测试明确关联起来。TestCase中定义了一些方法可以在testDown期间传递和报告失败信息,并遵循一个约定:每个测试必须等待它所创建的全部线程结束以后才能完成。
在测试方法的阻塞行为时,将引入额外的复杂性:当方法被成功地阻塞后,还必须使方法解除阻塞。
实现这个功能的一种简单方式是使用中断——在一个单独的线程中启动一个阻塞操作,等到线程阻塞后再中断它,然后宣告阻塞操作成功。这要求阻塞方法通过提前返回或抛出InterruptedException来响应中断。
12-3给出了一种测试阻塞操作的方法。
这种方法会创建一个“获取”线程,该线程将尝试从空缓存中获取一个元素。如果take方法成功,则表示测试失败。
执行测试的线程启动“获取”线程,等待一段时间,然后中断该线程。
如果“获取”线程正确地在take方法中阻塞,将抛出InterruptedException(interrupt方法),而捕获到这个异常的catch块将这个异常视为测试成功,并让线程退出。
然后,主测试线程尝试与“获取”线程合并,通过调用Thread.isAlive来验证join方法是否成功返回,如果“获取”线程可以响应中断,那么join能很快完成。
// 12-3 测试阻塞行为以及对中断的响应性
void testTakeBlocksWhenEmpty(){
final BoundedBuffer<Integer> bb=new BoundedBuffer<Integer>(10);
//如果“获取”线程正确地在take方法中阻塞,将抛出InterruptedException,而捕获到这个异常的catch块将这个异常视为测试成功,并让线程退出。
Thread taker=new Thread(){ //创建一个“获取”线程,该线程将尝试从空缓存中获取一个元素。
public void run(){
try{
int unused=bb.take();
//如果take方法成功,则表示测试失败。因为我们希望得是当缓存为空时take操作会阻塞直到不为空
fail(); //如果执行到这里,那么表示出现了一个异常
}catch (InterruptedException success) {
}
}
};
try{
taker.start(); //执行测试的线程启动“获取”线程
Thread.sleep(LOCKUP_DETECT_TIMEOUT); //等待一段时间
taker.interrupt(); //然后中断该线程
//然后,主测试线程尝试与“获取”线程合并,通过调用Thread.isAlive来验证join方法是否成功返回,
//如果“获取”线程可以响应中断,那么join能很快完成。
taker.join(LOCKUP_DETECT_TIMEOUT); //等待一段时间直到这个线程死亡,把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。
assertFalse(taker.isAlive()); //测试线程是否存活
}catch (Exception unexpected) {
fail(); //Fails a test with no message.
}
}
如果take操作由于某种意料之外ed原因停滞了,那么支持限时的join方法能确保测试最终完成。
这个测试了take的多种属性——不仅能阻塞,而且在中断后还能抛出InterruptedException。在这种情况下,最好是对Thread进行子类化而不是使用线程池中的Runnable,即通过join来正确地结束测试。
当主线程将一个元素放入队列后,“获取”线程应该解除阻塞状态,要测试这种行为,也可以使用相同的方法。
12.1.3 安全性测试
12-2和12-3的测试用例测试了有界缓存的一些重要属性,但它们无法发现由于数据竞争而引来的错误。
要想测试一个并发类在不可预测的并发访问情况下能否正确执行,需要创建多个线程来分别执行put和take操作,并在执行一段时间后判断在测试中是否会出现问题。
要测试在生产者——消费者模式中使用的类,一种好的方法是,通过一个对顺序敏感的校验和计算函数来计算所有入列元素以及出列元素的校验和,并进行比较,如果二者相等,那么测试就是成功的。如果只有一个生产者将元素放入缓存,同时也只有一个消费者从中取出元素,那么这种方法就能发挥最大的作用,因为它不仅能测试出是否取出了正确的元素,而且还能测试出元素被取出的顺序是否正确。
将这种方法扩展到多生产者——多消费者的模式,就需要一个对元素入列/出列顺序不敏感的校验和函数,从而在测试程序运行完毕以后,可以将多个校验和以不同的顺序组合起来。
要确保测试程序能正确地测试所有要点,就一定不能让编译器可以预先猜测到校验和的值。要避免这个问题,应该采用随机方式生成的数据。
12-4的xorShift,该函数基于hashCode和nanoTime来生成随机数,所得的结果既是不可预测的,而且基本上每次运行都不相同。
// 12-4 适合在测试中使用的随机数生成器
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
12-5和12-6的PutTakeTest中启动了N个生产者线程来生成元素并把它们插入到队列,同时还启动了N个消费者线程从队列中取出元素。当元素进出队列时,每个线程都会更新对这些元素计算得到的校验和,每个线程都拥有一个校验和,并在测试结束后将它们合并起来,从而在测试缓存时不会引入过多的同步或竞争。
创建线程与启动线程等操作可能需要较大开销,如果线程执行时间很短,并在循环中启动了大量的这种线程,那么最坏情况是,这些线程将会串行执行而不是并发执行。
5.5.1中介绍了一种缓解这个问题的方法,即使用两个CountDownLatch,其中一个作为开始阀门,另一个作为结束阀门。使用CyclicBarrier可以获得同样的效果:在初始化CyclicBarrier时将计数值指定为工作者线程的数量再加1,并在运行开始和结束时,使工作者线程和测试线程都在这个栅栏处等待。
这能确保所有线程在开始执行任何工作之前,都首先执行到同一个位置。PutTakeTest使用这项技术来协调工作者线程的启动和停止,从而产生更多的并发交替操作。我们仍无法确保调度器不会采用串行方式来执行每个线程,但只要这些线程的执行时间足够长,就能降低调度机制对结果的不利影响。
PutTakeTest使用了一个确定性的结束条件,从而判断测试何时完成时就不需要在线程之间执行额外的协调。test方法将启动相同数量的生产者线程和消费者线程,它们将分别插入(put)和取出(take)相同数量的元素,因为添加和删除的总数相同。
// 12-5 测试BoundedBuffer的生产者——消费者程序
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb; //CyclicBarrier和CountDownLatch一样,都是关于线程的计数器。
protected final int nTrials, nPairs; //nTrials为测试次数,nPairs表示有多少对生产者——消费者
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); //示例参数
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
//在初始化CyclicBarrier时将计数值指定为工作者线程的数量再加1,工作者线程为npairs个生产者线程和npairs个消费者线程
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); //等待所有线程就绪
barrier.await(); //等待所有的线程执行完成
assertEquals(putSum.get(), takeSum.get()); //假设这两个值是相等的,如果不相等会返回一个Assert Error
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
// 12-6 在PutTakeTest中使用的Producer和Consumer类
//并在运行开始和结束时,使工作者线程和测试线程都在这个栅栏处等待。
//这能确保所有线程在开始执行任何工作之前,都首先执行到同一个位置。
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime()); //^为异或操作
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed); //该函数基于hashCode和nanoTime来生成随机数
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
像PutTakeTest这种测试能很好地发现安全性问题。例如,在实现由信号量控制的缓存时,一个常见的错误就是在执行插入和取出的代码中忘记实现互斥行为(可以使用synchronized或ReentranLock)。如果在BoundedBuffer中忘记将doInsert和doExtract声明为synchronized,那么在运行PutTakeTest时会立即失败。
通过多个线程来运行PutTakeTest,并且使这些线程在不同系统上的不同容器的缓存上迭代数百万次,使我们能进一步确定在put和take方法中不存在数据破坏问题。
这些测试应该放在多处理器的系统上运行,从而进一步测试更多形式的交替运行。
12.1.4 资源管理的测试
测试的另一个方面就是要判断类中是否没有做它不应该做的事情,例如资源泄漏。
对于任何持有或管理其他对象的对象,都应该在不需要这些对象时销毁对它们的引用。
这种存储资源泄漏不仅会妨碍垃圾回收期回收内存,而且还会导致资源耗尽以及应用程序失败。
对于BoundedBuffer来说,资源管理的问题尤为重要。之所以要限制缓存的大小,其原因就是要防止由于资源耗尽而导致应用程序发生故障。
通过一些测量应用程序中内存使用的堆检查工具,可以容易地测试出内存的不合理占用。
12-7中的testLeak方法中包含了一写堆分析工具用于抓取堆的快照,这将强制执行一次垃圾回收(System.gc只是建议JVM在合适时刻执行垃圾回收),然后记录堆大小和内存用量的信息。
// 测试资源泄漏
private static final int CAPACITY = 10000;
private static final int THRESHOLD = 10000; //阈值
class Big{
double[] data=new double[100000];
}
//testLeak方法将多个大型对象插入到一个有界缓存中,然后再将它们移除。
void testLeak() throws InterruptedException{
BoundedBuffer<Big> bb=new BoundedBuffer<Big>(CAPACITY);
int heapSize1=snapshotHeap(); //生成堆的快照
for(int i=0;i<CAPACITY;i++)
bb.put(new Big());
for(int i=0;i<CAPACITY;i++)
bb.take();
int heapSize2=snapshotHeap();
//第2个堆快照中的内存用量应该与第1个堆快照中的内存用量基本相同。
assertTrue(Math.abs(heapSize1-heapSize2)<THRESHOLD);
}
private int snapshotHeap() {
//生成堆的快照
return 0;
}
testLeak方法将多个大型对象插入到一个有界缓存中,然后再将它们移除。
第2个堆快照中的内存用量应该与第1个堆快照中的内存用量基本相同。
然而,doExtract如果忘记将返回元素的引用置为空(items[i]=null),那么在两次快照中报告的内存用量将明显不同。
12.1.5 使用回调(Using Callbacks)
在构造测试案例时,对客户提供的代码进行回调是非常有帮助的。
回调函数的执行通常时在对象生命周期的一些已知位置上,并且在这些位置上非常适合判断不变性条件是否被破坏。
通过使用自定义的线程工厂,可以对线程的创建过程进行控制。12-8的TestingThreadFactory中将记录已创建线程的数量。这样,在测试过程中,测试方案可以验证已创建线程的数量。
我们还可以对TestingThreadFactory进行扩展,,使其返回一个自定义的Thread,并且该对象可以记录自己在何时结束,从而在测试方案中验证线程与被回收时是否于执行策略一致。
// 12-8 测试ThreadPoolExecutor的线程工厂类
class TestingThreadFactory implements ThreadFactory{
public final AtomicInteger numCreated=new AtomicInteger(); //记录已创建线程的数量
private final ThreadFactory factory=
Executors.defaultThreadFactory();
public Thread newThread(Runnable r){
numCreated.incrementAndGet();
return factory.newThread(r);
}
}
如果线程池的基本大小小于最大大小,那么线程池会根据执行需求相应增长。当把一些运行时间较长的任务提交给线程池时,线程池中的任务数量在长时间内都不会变化,这就可以进行一些判断,例如测试线程池是否能够按照预期的方式扩展。
// 12-9 验证线程池扩展能力的测试方法
public class TestThreadPool extends TestCase{
private final TestingThreadFactory threadFactory = new TestingThreadFactory();
public void testPoolExpansion() throws InterruptedException{
int MAX_SIZE=10;
ExecutorService exec=Executors.newFixedThreadPool(MAX_SIZE);
for(int i=0;i<10*MAX_SIZE;i++){
exec.execute(new Runnable(){
public void run(){
try{
Thread.sleep(Long.MAX_VALUE); //运行时间较长的任务提交给线程池
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
//当把一些运行时间较长的任务提交给线程池时,线程池中的任务数量在长时间内都不会变化,
//这就可以进行一些判断,例如测试线程池是否能够按照预期的方式扩展
for(int i=0;i<20&&threadFactory.numCreated.get()<MAX_SIZE;i++)
Thread.sleep(100);
assertEquals(threadFactory.numCreated.get(),MAX_SIZE);
exec.shutdown();
}
}