欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

停止基于线程的Service--JCIP7.2读书笔记

程序员文章站 2022-04-19 08:33:56
...

[本文是我对Java Concurrency In Practice 7.2的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

以ExecutorService为例, 该类内部封装有多个线程, 类外部无法直接停止这些线程. 相反, 外部调用Service的shutDown和shutDownNow方法关闭Service, 而Service负责停止其拥有的线程.

大多数server应用会使用到log, 下例中的LogWriter是一个使用生产者消费者模式构建的log service, 需要打印log的线程将待打印的内容加入到阻塞队列中, 而logger线程则不断的从阻塞队列中取出数据输出:

public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;

	public LogWriter(Writer writer) {
		this.queue = new LinkedBlockingQueue<String>(CAPACITY);
		this.logger = new LoggerThread(writer);
	}

	public void start() {
		logger.start();
	}

	/**
	 * 需要打印数据的线程调用该方法, 将待打印数据加入阻塞队列
	 */
	public void log(String msg) throws InterruptedException {
		queue.put(msg);
	}

	/**
	 * 负责从阻塞队列中取出数据输出的线程
	 */
	private class LoggerThread extends Thread {
		private final PrintWriter writer;
		// ...
		public void run() {
			try {
				while (true)
					writer.println(queue.take());
			} catch (InterruptedException ignored) {
			} finally {
				writer.close();
			}
		}
	}
}

LogWriter内部封装有LoggerThread线程, 所以LogWriter是一个基于线程构建的Service. 根据ExecutorService的经验, 我们需要在LogWriter中提供停止LoggerThread线程的方法. 看起来这并不是很难, 我们只需在LogWriter中添加shutDown方法:

/**
 * 该方法用于停止LoggerThread线程
 */
public void shutDown() {
	logger.interrupt();
}

当LogWriter.shutDown方法被调用时, LoggerThread线程的中断标记被设置为true, 之后LoggerThread线程执行queue.take()方法时会抛出InterruptedException异常, 从而使得LoggerThread线程结束.

然而这样的shutDown方法并不是很恰当: 

1. 丢弃了队列中尚未来得及输出的数据.

2. 更严重的是, 假如线程A对LogWriter.log方法的调用因为队列已满而阻塞, 此时停止LoggerThread线程将导致线程A永远阻塞在queue.put方法上.

对上例的改进:

public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread loggerThread;
	private final PrintWriter writer;

	/**
	 * 表示是否关闭Service
	 */
	private boolean isShutdown;
	/**
	 * 队列中待处理数据的数量
	 */
	private int reservations;

	public void start() {
		loggerThread.start();
	}

	public void shutDown() {
		synchronized (this) {
			isShutdown = true;
		}
		loggerThread.interrupt();
	}

	public void log(String msg) throws InterruptedException {
		synchronized (this) {
			// service已关闭后调用log方法直接抛出异常
			if (isShutdown)
				throw new IllegalStateException("Service has been shut down");
			++reservations;
		}
		// BlockingQueue本身就是线程安全的, put方法的调用不在同步代码块中
		// 我们只需要保证isShutdown和reservations是线程安全的即可
		queue.put(msg);
	}

	private class LoggerThread extends Thread {
		public void run() {
			try {
				while (true) {
					try {
						synchronized (this) {
							// 当service已关闭且处理完队列中的所有数据时才跳出while循环
							if (isShutdown && reservations == 0)
								break;
						}
						String msg = queue.take();
						synchronized (this) {
							--reservations;
						}
						writer.println(msg);
					} catch (InterruptedException e) {
						// 发生InterruptedException异常时不应该立刻跳出while循环
						// 而应该继续输出log, 直到处理完队列中的所有数据
					}
				}
			} finally {
				writer.close();
			}
		}
	}
}

上面的处理显得过于复杂, 利用ExecutorService可以编写出相对更简洁的程序:

public class LogService {
	/**
	 * 创建只包含单个线程的线程池, 提交给该线程池的任务将以串行的方式逐个运行
	 * 本例中, 此线程用于执行打印log的任务
	 */
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final PrintWriter writer;

	public void start() {
	}

	public void shutdown() throws InterruptedException {
		try {
			// 关闭ExecutorService后再调用其awaitTermination将导致当前线程阻塞, 直到所有已提交的任务执行完毕, 或者发生超时
			exec.shutdown();
			exec.awaitTermination(TIMEOUT, UNIT);
		} finally {
			writer.close();
		}
	}

	public void log(String msg) {
		try {
			// 线程池关闭后再调用其execute方法将抛出RejectedExecutionException异常
			exec.execute(new WriteTask(msg));
		} catch (RejectedExecutionException ignored) {
		}
	}
	
	private final class WriteTask implements Runnable {
		private String msg;
		
		public WriteTask(String msg) {
			this.msg = msg;
		}

		@Override
		public void run() {
			writer.println(msg);
		}
	}
}