欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

终止阻塞的线程

程序员文章站 2024-01-13 19:30:04
...

原文出处:开源中国,文章链接地址 https://my.oschina.net/itblog/blog/514047  

原文作者 :摆渡者,博客地址 https://my.oschina.net/itblog

原文如下:

线程状态

    我们知道,一个线程可以处于以下四种状态之一:

    1. 新建(New):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。此刻线程已经有资格获取CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。

    2. 就绪(Runnable):在这种状态下,只要调度器将CPU时间片分给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。

    3. 阻塞(Blocked):线程能够运行,但有某个或多个条件阻止它运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间片。直到线程重新进入了就绪状态,它才有可能执行操作。

    4. 死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以不被中断。

进入线程状态

    而一个任务进入阻塞状态,可能由以下原因造成:

    1. 通过调用sleep(milliseconds)方法使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

    2. 通过调用wait()方法使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在JavaSE5的java.util.concurrent类库中等价的signal()活signalAll()消息),线程才会进入就绪状态。

    3. 任务在等待某个I/O操作完成。

    4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

    在较早的代码中,也可能会看到用suspend()和resume()方法来阻塞和唤醒线程,但是在Java新版本中这些方法被废弃了,因为它们可能导致死锁。stop()方法也已经被废弃了(JDK1.6以后),因为它不释放线程获得的锁,并且如果线程处于不一致的状态,其他任务可以在这种状态下浏览并修改它们。

    现在我们需要查看的问题是:有事你希望能够终止处于阻塞状态的任务。如果对于阻塞装填的任务,你不能等待其到达代码中可以检查其状态值的某一点,因而决定让它主动终止,那么你就必须强制这个任务跳出阻塞状态。

中断

    正如你所想象的,在Runnable.run()方法的中间打断它,与到达程序准备好离开该方法的其他一些地方相比,要复杂得多。因为当你打断被阻塞的任务时,可能需要清理资源。正因为这一点,在任务的run()方法中间打断,更像是抛出的异常,因此在Java线程中的这种类型的异常中断中用到了异常。为了在以这种方式终止任务时返回良好的状态,你必须仔细考虑代码的执行路径,并仔细编写catch字句以便正确的清除所有事物。

    Thread类包含了interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。正如你将看到的,Thread.interrupted()提供了离开run()循环而不抛出异常的第二种方式。

    为了调用interrupt(),你必须持有Thread对象。你可能已经注意到了,新的concurrent类库似乎在避免对Thread对象上的直接操作,转而尽量的通过Executor来执行所有操作。如果你在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的线程。这么做是有意义的,因为当你完成工程中的某个部分或者整个程序时,通常会希望同时关闭某个特定Executor的所有任务。然而,你有时也会希望只中断某个单一任务。如果使用Executor,那么通过调用submit()方法而不是execute()方法来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远都不会在其上调用get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定任务。如果你将true传递给cancel(),那么它就会拥有在该线程上调用interrupt()以停止这个线程的能力。因此,cancel是一种中断由Executor启动的单个线程的方式。

    下面的示例使用Executor展示了基本的interrupt()用法:

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;
    public IOBlocked(InputStream is) {
        in = is;
    }
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from Blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            //永不释放获得的锁
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        //在构造的时候就获取该对象的锁
        new Thread(){
            @Override
            public void run() {
                f();
            }
        };
    }
    
    @Override
    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future<?> future = exec.submit(r);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Interrupting " + r.getClass().getName());
        future.cancel(true);//如果在运行的话,中断该线程。
        System.out.println("Interrupting sent to " + r.getClass().getName());
    }
    
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0);");
        //强行停止退出
        System.exit(0);
    }
}

执行结果:

Interrupting SleepBlocked
Interrupting sent to SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Waiting for read():
Interrupting IOBlocked
Interrupting sent to IOBlocked
Trying to call f()
Interrupting SynchronizedBlocked
Interrupting sent to SynchronizedBlocked
Aborting with System.exit(0);

    上面的每个任务都表示了一种不同类型的阻塞。SleepBlock是可中断的阻塞示例,而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例。这个程序证明I/O和在synchronized块上的等待是不可中断的,但是通过浏览代码,你也可以预见到这一点——无论是I/O还是尝试调用synchronized方法,都不需要任何InterruptedException处理器。

    两个类很简单直观:在第一个类中run()方法调用了sleep(),在第二个类中调用了read()。但是为了演示SynchronizedBlock,我们必须首先获得锁。这是通过在构造器中创建匿名的Thread类的实例来实现的,这个匿名Thread类的对象通过调用f()获得了对象锁(这个线程必须有别于为启动SynchronizedBlock.run()的线程,因为同一个线程可以多次获得某个对象锁,你将在稍后看见)。由于f()永远都不反回,因此这个锁永远不会释放,而SynchronizedBlock.run()在试图调用f(),并阻塞以等待这个锁被释放。

    从输出中可以看到,你能够中断对sleep()的调用(或者任何要求抛出InterruptedException的调用)。但是你不能中断正在试图获取synchronized锁或者正在试图执行I/O操作的线程。这有点令人烦恼,特别是在创建执行I/O任务时,因为这意味着I/O具有锁住你的多线程程序的潜在可能。特别是对于急于Web的程序,这更是关乎厉害。

    对于这类问题,有一个略显笨拙但是确实行之有效的解决方案,那就是关闭任务在其上发生阻塞的资源

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream stream = new Socket("localhost", 8080).getInputStream();
        service.execute(new IOBlocked(stream));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        service.shutdownNow();//尝试停止所有正在执行的任务
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + stream.getClass().getName());
        stream.close();//通过关闭线程操作的资源来释放阻塞的线程
    }
}

执行结果:

Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from Blocked I/O
Exiting IOBlocked.run()

    在shutdownNow()被调用之后以及在输入流上调用close()之前的延迟强调的是一旦底层资源被关闭,任务将解除阻塞。

    幸运的是,各种NIO类提供了更人性化的I/O中断。被阻塞的nio通道会自动地响应中断

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class NIOBlocked implements Runnable {
    private final SocketChannel channel;
    
    public NIOBlocked(SocketChannel channel) {
        this.channel = channel;
    }
    
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            channel.read(ByteBuffer.allocate(1));//阻塞当前任务
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException");
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        Future<?> f = service.submit(new NIOBlocked(sc1));
        service.execute(new NIOBlocked(sc2));
        //尝试关闭任务,但由于任务处于阻塞状态,关闭不了。
        service.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // 通过在channel1上调用cancel来产生中断
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // 释放channel2
        sc2.close();
    }
}

执行结果:

Waiting for read() in [email protected]
Waiting for read() in [email protected]
ClosedByInterruptException
Exiting NIOBlocked.run() [email protected]
AsynchronousCloseException
Exiting NIOBlocked.run() [email protected]

    如你所见,你还可以关闭底层资源以释放锁,尽管这种做法一般不是必须的。注意,使用execute()来启动两个任务,并调用service.shutdownNow()将可以很容易的终止所有事物,而对于捕获上面示例中的Future,只有在将中断发送给一个线程,同时不发送给另一个线程时才是必须的。

相关标签: 线程 阻塞