处理不可中断的阻塞-JCIP7.1读书笔记
程序员文章站
2022-03-02 10:33:24
...
[本文是我对Java Concurrency In Practice 7.1的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
并不是所有的阻塞都是可中断的, 比如InputStream.read方法. 在检测到输入数据可用, 到达流末尾或者抛出异常前, 该方法一直阻塞. 而且阻塞的时候不会检查中断标记, 所以中断线程无法使read从阻塞状态返回. 但是关闭流可以使得read方法抛出异常, 从而从阻塞状态返回.
public class ReaderThread extends Thread { private static final int BUFSZ = 1024; private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException { this.socket = socket; this.in = socket.getInputStream(); } // 覆盖Thread类的interrupt方法, 加入关闭socket的代码 // 如果发生中断时, 线程阻塞在read方法上, socket的关闭会导致read方法抛出SocketException, 然后run方法运行完毕 public void interrupt() { try { socket.close(); } catch (IOException ignored) { } finally { super.interrupt(); } } public void run() { try { byte[] buf = new byte[BUFSZ]; while (true) { int count = in.read(buf); if (count < 0) break; else if (count > 0) processBuffer(buf, count); } } catch (IOException e) { /* Allow thread to exit */ } } private void processBuffer(byte[] buf, int count) { //... } }
如果task并非在自己创建的线程里运行, 而是提交给线程池运行的话, 就无法使用上例的方式处理不可中断阻塞了. 之前有过分析, 对于提交给线程池执行的task, 应该通过Future.cancel方法提前终止task的运行, 所以可以考虑重写Future.cancel方法, 在其中加入关闭socket的操作. Future对象是由submit方法返回的, 其源代码如下:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可知submit方法返回的Future对象是调用newTaskFor方法获得的:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
newTaskFor方法被声明为protected, 所以我们可以通过继承覆盖该方法, 返回自定义的Future对象.
首先将需要覆盖的2个方法定义在接口中:
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); }
然后让task类实现CancellableTask接口:
public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T> newTask() { return new FutureTask<T>(this) { // 定义FutureTask的匿名内部类, 并覆盖cancel方法, 向其中加入关闭socket的操作 public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } }
接着继承ThreadPoolExecutor类并覆盖newTaskFor方法, 让该方法返回自定义的FutureTask对象:
public class CancellingExecutor extends ThreadPoolExecutor { // ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { // 如果callable是CancellableTask对象, 那么就返回自定义的FutureTask(通过调用其newTaskFor方法实现) if (callable instanceof CancellableTask) return ((CancellableTask<T>) callable).newTask(); else return super.newTaskFor(callable); } }
测试代码:
public static void main(String[] args) { CancellingExecutor executor = new CancellingExecutor(); SocketUsingTask task = new SocketUsingTask(); task.setSocket(new Socket("www.baidu.com", 80)); Future<V> future = executor.submit(task); future.cancel(true); }
上一篇: 改善并发程序的可扩展性--JCIP C11读书笔记
下一篇: 中断线程--JCIP7.1读书笔记