欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

task与execution--JCIPC08读书笔记

程序员文章站 2022-04-18 21:29:56
...

[本文是我对Java Concurrency In Practice C08的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

task和线程池执行机制之间隐式的耦合

前面曾提到过, 线程池的应用解耦了task的提交和执行. 事实上, 这有所夸大, 因为不是所有的task都适用于所有的执行机制, 某些task要求在特定的线程池中执行:

1. 非独立task, 指的是依赖于其他task的任务. 

2. 要求在单线程中运行的task. 某些task不是线程安全的, 无法并发运行. Executors.newSingleThreadExecutor()方法返回的线程池只包含单个线程, 提交给该线程池的task将缓存在一个*队列中, 线程池中所包含的单个线程将依次从队列中取出task运行.

3. 响应时间敏感的task. 某些task要求必须在极短的时间内开始执行, 比如GUI应用中处理用户点击操作的task. 假如提交给某一线程池的task既包含long-running task, 也包含响应时间敏感的task, 那么响应时间敏感的task可能无法在极短的时间内得到执行. 

4. 使用了ThreadLocal类的task. 线程池的标准实现可能会在空闲时销毁多余的线程, 繁忙时创建更多的线程, 更有可能重用线程. 所以使用了ThreadLocal的task不应该提交给线程池运行, 除非ThreadLocal的使用只限定在单个task内, 不用于多个task之间通信.

 

线程饥饿死锁

如果提交给线程池运行的task之间不是相互独立的, 就有可能出现线程饥饿死锁. 比如提交给SingleThreadExecutor执行的2个task, task A在执行过程中需要等待task B的执行结果才能继续, 而此时没有多余的线程用于执行task B, 如此就发生了线程饥饿死锁.

public class StarvationDeadLock {
	public static void main(String[] args) {
		final ExecutorService executor = Executors.newSingleThreadExecutor();
		final Runnable taskB = new Runnable() {
			@Override
			public void run() {
				//...
			}
		};
		Runnable taskA = new Runnable() {
			@Override
			public void run() {
				Future<?> future = executor.submit(taskB);
				try {
					System.out.println("waiting for taskB complete");
					// get方法将阻塞, 直到taskB执行完成
					// 但是由于线程池中只有一个线程, 而该线程已经被taskA占用, 所以taskB将没有机会执行. 
					// 此时就发生了线程饥饿死锁
					future.get();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
				//...
			}
		};
		executor.submit(taskA);
	}
}

不仅SingleThreadExecutor执行相互依赖的task时会发生死锁, 其他线程池执行相互依赖的task时也可能发生死锁:

public class StarvationDeadLock {
	public static void main(String[] args) {
		final ExecutorService executor = Executors.newFixedThreadPool(3);
		// 设定await在Barrier对象上的线程数达到4个时, 其await方法才释放
		final CyclicBarrier barrier = new CyclicBarrier(4);
		
		// 重复提交4个task, 每个task都await在barrier对象上
		// barrier的await方法将一直阻塞, 直到4个线程都到达await点.
		// 但是线程池中只有3个线程, 不可能出现4个线程都达到await点的情形, 所以依然会发生死锁
		for (int i = 0; i < 4; i++) {
			executor.submit(new Runnable() {
				@Override
				public void run() {
					try {
						System.out.println("waiting for other tasks arriving at common point");
						barrier.await();
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
} 

避免相互依赖的task提交给同一线程池执行时发生死锁的唯一方法是: 线程池中的线程足够多. 

 

确定线程池的size

如果线程池的size过大, 将造成内存等资源的浪费, 甚至使得资源耗尽. 如果线程池的size过小, 将造成CPU的利用率不高. 确定合适的size需要考虑:CPU数, 内存, 是计算密集型task还是I/O密集型task, 是否需要获取稀缺资源(比如数据库连接)等.

对于计算密集型task, 合适的size大约为CPU数量+1. 对于I/O占较大比例的task, 合适的size可以通过以下公式确定: size = CPU数量 * CPU利用率 * (1 + I/O时间比例). Runtime.getRuntime().availableProcessors()返回CPU的个数.

当然, 实际开发中size还受到内存, 文件句柄, socket, 数据库连接数等稀缺资源的约束. 将总的稀缺资源除以每一个task使用的资源数, 能得到线程数的上限. 

 

循环并行化

如果循环体所进行的操作是相互独立的, 这样的循环可以并发的运行:

// 循环操作
void processSequentially(List<Element> elements) {
	for (Element e : elements)
		process(e);
}

// 将相互独立的循环操作转变为并发操作
void processInParallel(Executor exec, List<Element> elements) {
	for (final Element e : elements) {
		exec.execute(new Runnable() {
			public void run() {
				process(e);
			}
		});
	}
	exec.shutdown(); 
	exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} 

如果希望同时提交一系列task, 并且等待它们执行完毕, 可以调用ExecutorService.invokeAll方法.

如果希望task执行完毕之后就获取其执行结果, 可以使用CompletionService.