欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并行开发笔记6

程序员文章站 2022-05-22 18:08:32
...
对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

    在下面的程序中给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(Multiple-Producer,Single-Consumer)的设计方式:每个调用log的操作都相当于一个生产者,而后台的日志线程则相当于消费者。如果消费者的处理速度低于生产者的生成速度,那么BlockingQueue将阻塞生产者,直到日志线程有能力处理新的日志消息。

 

   不支持关闭的生产者-消费者日志服务

public class LogWriter {
	private static final int CAPACITY = 5;
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	
	public LogWriter(Writer writer){
		this.queue = new LinkedBlockingDeque<String>(CAPACITY);
		this.logger = new LoggerThread(writer);
	}
	
	public void start(){
		logger.start();
	}
	
	public void log(String msg) throws InterruptedException{
		queue.put(msg);
	}
	
	private class LoggerThread extends Thread{
		private final PrintWriter writer;
		
		public LoggerThread(Writer writer){
			this.writer = (PrintWriter) writer;
		}
		
		public void run(){
			try{
				while(true){
					writer.println(queue.take());
				}
			}catch(InterruptedException innored){
			}finally{
				writer.close();
			}
		}
	}
}

    一般地,像Logwriter这样的服务在软件产品中能发挥实际的作用,还需要实现一种终止日志线程的方法,从而避免使JVM无法正常关闭。

   一种关闭LogWriter的方法是:设置某个“已请求关闭”标志,以避免进一步提交日志消息。但存在竞态问题,提供可靠关闭操作能够解决该问题,因而要使日志消息的提交操作成为原子操作。具体代码如下:

public class LogService {
	private static final int CAPACITY = 5;
	private final BlockingQueue<String> queue;
	private final LoggerThread loggerThread;
	private final PrintWriter writer;
	
	@GuardedBy("this") private boolean isShutdown;
	@GuardedBy("this") private int reservations;  //预约
	
	public LogService(PrintWriter writer){
		this.queue = new LinkedBlockingQueue<String>(CAPACITY);
		this.writer = writer;
		loggerThread = new LoggerThread();
	}
	
	
	public void start(){
		loggerThread.start();
	}
	
	public void stop(){
		synchronized(this){
			isShutdown = true;
		}
		loggerThread.interrupt();
	}
	
	public void log(String msg) throws InterruptedException{
		synchronized(this){
			if(isShutdown){
				throw new IllegalStateException("...");
			}
			++reservations;
		}
		queue.put(msg);
	}
	
	private class LoggerThread extends Thread{
		public void run(){
			try{
					while(true){
						try{
							synchronized(LogService.this){
								if(isShutdown && reservations == 0){
									break;
								}
							}
							String msg = queue.take();
							synchronized(LogService.this){
								--reservations;
							}
							writer.println(msg);
						}catch(InterruptedException e){
							/* retry */
						}
					}
			  }finally{
				writer.close();
			  }
		}
	}
}

   另外一种关闭生产者-消费者服务的方式就是使用“毒丸(Poison Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止”。 示例代码如下:

public class IndexingService {
	private static final File POISON = new File("");
	private final IndexerThread consumer = new IndexerThread();
	private final CrawlerThread producer = new CrawlerThread();
	private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>();
	private final File root = new File("root");
	
	
	
	public void start(){
		producer.start();
		consumer.start();
	}
	
	public void stop(){
		producer.interrupt();
	}
	
	public void awaitTermination() throws InterruptedException{
		consumer.join();
	}
	
	
	public class CrawlerThread extends Thread{
		public void run(){
			try{
				crawl(root);
			}catch(InterruptedException e){
				/* 发生异常 */
			}finally{
				while(true){
					try{
						queue.put(POISON);
						break;
					}catch(InterruptedException e1){
						/*  重新尝试 */
					}
				}
			}
		}
		private void crawl(File root) throws InterruptedException{
			/*....*/
		}
	}
	public class IndexerThread extends Thread{
		public void run(){
			try{
				while(true){
					File file = queue.take();
					if(file == POISON){
						break;
					}else{
						indexFile(file);
					}
				}
			}catch(InterruptedException consumed){
				
			}
		}

		private void indexFile(File file) {
			/* ... */
		}
	}	
}
相关标签: Java 并行计算