Java并行开发笔记6
程序员文章站
2022-05-22 18:08:32
...
对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。 |
在下面的程序中给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(Multiple-Producer,Single-Consumer)的设计方式:每个调用log的操作都相当于一个生产者,而后台的日志线程则相当于消费者。如果消费者的处理速度低于生产者的生成速度,那么BlockingQueue将阻塞生产者,直到日志线程有能力处理新的日志消息。
不支持关闭的生产者-消费者日志服务
public class LogWriter { private static final int CAPACITY = 5; private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer){ this.queue = new LinkedBlockingDeque<String>(CAPACITY); this.logger = new LoggerThread(writer); } public void start(){ logger.start(); } public void log(String msg) throws InterruptedException{ queue.put(msg); } private class LoggerThread extends Thread{ private final PrintWriter writer; public LoggerThread(Writer writer){ this.writer = (PrintWriter) writer; } public void run(){ try{ while(true){ writer.println(queue.take()); } }catch(InterruptedException innored){ }finally{ writer.close(); } } } }
一般地,像Logwriter这样的服务在软件产品中能发挥实际的作用,还需要实现一种终止日志线程的方法,从而避免使JVM无法正常关闭。
一种关闭LogWriter的方法是:设置某个“已请求关闭”标志,以避免进一步提交日志消息。但存在竞态问题,提供可靠关闭操作能够解决该问题,因而要使日志消息的提交操作成为原子操作。具体代码如下:
public class LogService { private static final int CAPACITY = 5; private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; @GuardedBy("this") private boolean isShutdown; @GuardedBy("this") private int reservations; //预约 public LogService(PrintWriter writer){ this.queue = new LinkedBlockingQueue<String>(CAPACITY); this.writer = writer; loggerThread = new LoggerThread(); } public void start(){ loggerThread.start(); } public void stop(){ synchronized(this){ isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException{ synchronized(this){ if(isShutdown){ throw new IllegalStateException("..."); } ++reservations; } queue.put(msg); } private class LoggerThread extends Thread{ public void run(){ try{ while(true){ try{ synchronized(LogService.this){ if(isShutdown && reservations == 0){ break; } } String msg = queue.take(); synchronized(LogService.this){ --reservations; } writer.println(msg); }catch(InterruptedException e){ /* retry */ } } }finally{ writer.close(); } } } }
另外一种关闭生产者-消费者服务的方式就是使用“毒丸(Poison Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止”。 示例代码如下:
public class IndexingService { private static final File POISON = new File(""); private final IndexerThread consumer = new IndexerThread(); private final CrawlerThread producer = new CrawlerThread(); private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>(); private final File root = new File("root"); public void start(){ producer.start(); consumer.start(); } public void stop(){ producer.interrupt(); } public void awaitTermination() throws InterruptedException{ consumer.join(); } public class CrawlerThread extends Thread{ public void run(){ try{ crawl(root); }catch(InterruptedException e){ /* 发生异常 */ }finally{ while(true){ try{ queue.put(POISON); break; }catch(InterruptedException e1){ /* 重新尝试 */ } } } } private void crawl(File root) throws InterruptedException{ /*....*/ } } public class IndexerThread extends Thread{ public void run(){ try{ while(true){ File file = queue.take(); if(file == POISON){ break; }else{ indexFile(file); } } }catch(InterruptedException consumed){ } } private void indexFile(File file) { /* ... */ } } }