欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java线程池

程序员文章站 2022-03-08 18:59:52
...

1. Java线程与线程池

1.1 线程

相比于“繁重”的进程,线程可以算是一种轻量级的进程,大多数操作系统都是支持以线程作为调度执行单元以提高系统的并发性。毫不例外,Java也支持多线程。

多个线程交替甚至并行的执行,特别在多处理器时代,可以极大的提高资源的利用率。通常我们使用多线程来并发的执行多个具有明显边界的任务,例如Web服务器使用多线程来同时处理来自多个用户的请求,每一个请求便是一个独立的、有边界的任务。

Java中新建一个线程通常有两种方式:

  • new Thread() 方式
Thread t = new Thread() {
  @Override
  public void run() {
    //这里是将要被执行的代码       
  };
};
  • new Thread(new Runnable())方式
Runnable task = new Runnable() {    
  @Override
  public void run() {
    //这里是将要被执行的代码   
  }
};
Thread t = new Thread(task);

第一种方式存在一个很大的缺点:被执行的代码(任务)和执行机制(如何执行这些任务)耦合了

如果将任务执行机制分别抽象出来,那么久会获得更好的灵活性。第二种通过Runnable方式创建线程的方式,正好克服了这个缺点,Runnable抽象了任务,而Thread抽象了执行机制,一个线程可以通过维护一个工作队列,采取不同的执行策略来执行多个不同的的任务。

因此,第二种创建线程的方式是使用得最多、也更推荐的方式,后面讨论的Executor等内容都会看见这种创建线程的方式。

线程创建完毕后,调用start()方法便可以启动该线程。

t.start()

Java线程大致分为两类:

  1. 守护线程 通过 方法setDaemon(true)将某个线程设置为守护线程
  2. 非守护线程 默认创建的是非守护线程

JVM启动时候,会创建一个非守护线程执行main方法,这个线程有时候也被称为主线程,然后程序可以自主的创建守护线程和非守护线程。JVM一直执行所有被创建的线程,当调用System.exit()或者JVM中所有的非守护线程都死了(正常执行run方法完毕或者未处理异常层层上抛最后抛到了JVM层),此时JVM停止。

到这里我们都在使用一个名为Thread的类,现在有这么几个问题?

  • Thread类都有哪些常用的方法?
  • Java中的线程和OS线程是一一对应的吗?亦或是多对一?甚至是多对多?

常用的方法用:

  1. start() :启动一个该线程
  2. sleep(long time): 让该线程让出CPU进入睡眠,直到指定的time后,恢复然后进入可运行状态。
  3. yield(): 主动让出CPU
  4. interrupt() : 向该线程发出一个中断请求
  5. isInterrupt(): 返回该线程是否存在中断请求
  6. interrupted(): 返回该线程是否存在中断请求,并清空线程的中断状态。

对于Java的线程和OS线程的对应关系,通过查看源代码:

 public synchronized void start() {
         ....
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

调用了start0()方法,是一个native方法:查看OpenJDK\src\share\native\java\lang\Thread.c源码

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

start0对应于JVM_StartThread函数,该函数是JVM提供的一个API函数,查找HotSpot下的\src\share\vm\prims\jvm.cpps,找到对应的方法:发现该出实际上new JavaThreadJavaThreadHotSpot定义的C++类。

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
      ....
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);   
      Thread::start(native_thread);
JVM_END

\src\share\vm\runtime\thread.cpp找到JavaThread的定义:其调用了os::create_thread()函数

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
  Thread()
  ....
  os::create_thread(this, thr_type, stack_sz);
  ...
}

其中OS::create_thread函数的实现是依赖于不同操作系统,这里选择linux

bool os::create_thread(Thread* thread, ThreadType thr_type, size_t stack_size) {
    ....
    int ret = pthread_create(&tid, &attr, (void* (*)(void*)) java_start, thread);
    ....
  return true;
}

到此看出了实际上是pthread_create为我们创建了一个线程,而Java线程OS线程是一对一的关系。

到这里,对于线程简单的总结算是完成了,我们知道当我们需要并发处理多个任务时,我们可以简单的创建一个新的线程去处理这些任务,但是这样的方式在高负载的情况下确是不明智的。

1.2 线程池

针对每一个需要被执行的任务都创建一个线程来执行的方式虽然很直观,但是存在如下缺点:

  • 线程创建与销毁代价非常高,当任务到达的速度特别快时,为每个任务创建一个线程会快速消耗掉系统资源。
  • 当线程的数量大于CPU的数量的时候,当CPU正在100%工作的时候,这时为每一个到来的任务创建一个线程只会耗费额外的内存,而不会提高资源利用率

由于不能无限制的创建线程,因此将一定数量的线程放在一个池中形成一个线程池,同时维护一个任务队列,让这些线程不停的去执行任务,是一个有效解决上述缺点的好办法。

线程池主要用来解决线程生命周期开销问题资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在新任务到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即执行新的任务,使响应更快。另外,通过适当的动态的调整线程中的线程数目可以防止出现资源不足的情况。

使用线程池提交任务这种模型和生产者-消费者模式是何其的相似。

任务提交者便是生产者

线程池维护的工作队列便是缓冲区

线程池维护的那些线程便是消费者

Java线程池

对于一个线程池来讲,它应该需要关注的主要是:

  • 创建多少个线程?
  • 工作队列中的任务数量是否有上边界?
  • 当工作队列的任务数量少于线程数量的时候,需要回收线程吗?
  • 这么多线程并发的访问工作队列等共享资源时,如何正确的进行同步而又不降低效率?
  • 如何让任务提交者对已经提交的任务进行操作(获取结果、取消任务)?
  • 如何优雅的关闭线程池?
  • ……

上述每一个关注点无疑都是需要精心设计、反复推敲的,如果from scratch构建一个线程池,对实现者的要求是极高的。因此Java为广大用户提高了一个线程池框架,通过该框架我们可以较容易的完成一些常见的多线程任务。

2. Executor框架

Java提供的线程池框架主要涉及到如下的类和接口:

  • Executors: 通过很多静态方法,提供不同的预配置的线程池;
  • Executor: Executor最上层的接口,只包含execute(Runnable command)方法;
  • ExecutorService: Executor的子接口;包含很多有用的方法,例如submmit()、shutDown()、shutDownNow()、awaitTermination()、invokeAll();
  • ThreadPoolExecutor: 具体的线程池的实现类;

2.1 Executors

工具类Executors主要提供以下几种预配置的线程池:

  1. newFixedThreadPool(int nThreads) 创建一个包含nThreads个线程的线程池,共享一个无边界的工作队列,在任何时刻,线程池最多有nThreads个存活线程;当某个线程由于执行过程中出现错误而死亡,则新建一个线程以补充。

    例如:创建一个包含5个线程的线程池

    Executor exe = Executors.newFixedThreadPool(5);
    exe.execute(new Runnable() {
    
     @Override
     public void run() {
       // TODO Auto-generated method stub
    
     }
    });

  2. newSingleThreadExecutor()创建仅仅包含一个线程的线程池,维护着一个无边界的工作队列,在任何时刻,线程池只能有一个任务被执行,任务被保证顺序的执行。如果线程由于执行过程中出现错误而死亡,则新建一个线程代替继续执行任务;和newFixedThreadPool(1)不一样,一旦线程池创建,该线程池不能再进行配置。这是通过将ThreadPoolExecutor包装成包装类实现的,因此该方法返回的Executor不能强制转化为ThreadPoolExecutor;

    public static ExecutorService newSingleThreadExecutor() {
           return new FinalizableDelegatedExecutorService //包装类
              (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }
    ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newSingleThreadExecutor(); //强转将失败

  3. newCachedThreadPool()创建一个线程池,当新任务被提交,如果池中没有空余的存活线程,则该线程池会创建新的线程;如果有存活的多余的线程则会复用该线程;当某个线程空闲超过60秒的时候,该线程会被终止然后被移除线程池。这种线程池适合被用于处理大量耗时短的任务,因为设置了一个空闲超时时间,这样当整个线程池都闲下来时,基本不会暂用额外的资源。

    Executor exe = Executors.newCachedThreadPool();
    exe.execute(new Runnable() {
    
     @Override
     public void run() {
       // TODO Auto-generated method stub
    
     }
    });

  4. newSingleThreadScheduledExecutornewThreadScheduledExecutor(int coreThreadSize)分别是创建包含一个线程和指定数量的线程,该线程池的线程定时的执行一些任务;

    以上是Executors包含的一些常用的静态方法,它为我们预配置一些常用的线程池,但是在某些时候,这些预配置的满足不了需求,JDK也通过ThreadPoolExecutor重载的构造方法让使用者根据自己的需求进行线程池配置。

2.2 ThreadPoolExecutor

上述提到的Executors中的静态方法返回的预配置线程池,也是通过调用不同ThreadPoolExecutor的构造方法完成的。

具体来讲,ThreadPoolExecutor构造方法有几个:

  • public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
  • public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
  • public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
  • public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

综合来看就是如下几个配置参数:

  • corePoolSize

    指定常驻线程池中的线程的数量;当由于某些原因,线程池中的线程数量小于corePoolSize时,线程池负责创建新的线程来补充,直到数量达到corePoolSize。

  • maximumPoolSize

    指定线程池中最多的线程数量,maximumPoolSize >= corePoolSize恒成立。对于newFixedThreadPoolnewSingleThreadPool的配置为maximumPoolSize = corePoolSize;而对于newCachedThreadPool中的配置为maximumPoolSize = Integer.MAX_VALUE > corePoolSize = 0

  • keepAliveTime/unit

    当线程池中线程数量大于corePoolSize的时候,对于的线程的空闲时间超过keepAliveTime后,会被终止,从线程池中移除。在newCachedThreadPool中,这个值默认是60秒。unit为时间单位。

  • workQueue

    线程池中用于维护那些已经提交但还未被执行的任务;

  • threadFactory

    负责创建线程的线程工厂;

  • handler

    当工作队列里面待处理的任务已经达到工作队列的最大容量时或者线程池已经(或正在)关闭,其他任务提交者提交的任务将会被拒绝执行;线程池会回调这个handler来通知这次拒绝。

示例:

创建一个常驻线程数量为2的,最大线程数量为5的,多余线程空闲超时为30秒的, *工作队列的线程池:

Executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

3. Future、Callable

现在有了配置好的线程池,接下来便是向线程池提交我们的任务。

通常来讲,任务按有无返回值可以分为两类:

  • 无返回值:这类任务不关心返回值,例如复制一个文件;
  • 有返回值:这类任务关系返回值,例如请求一个网络资源;

JDK提供两种相似的接口来达到上述目的:

  • Runnable : 包含void run()方法,无返回值;也不会抛出受检查的异常;
  • Callable : 包含V call(), 有返回值,会抛出受检查的异常;

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

无论是Runnable还是Callable,任务提交者希望可以在向线程池提交任务后,获得一个关于该任务的Handle,以便任务提交者对任务进行控制(取消任务、查看任务是否完成、获取任务执行结果)这种类似于Handle的便是Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,       TimeoutException;
}

示例:

当提交一个获取网页的任务后,主线程通过Future获取任务执行结果:

ExecutorService e = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), rEH);
        Callable<String> call = new Callable<String>() {
            @Override
            public String call() throws Exception {
                // TODO Auto-generated method stub
                //模拟网络请求
                Thread.sleep(500);
                return "TEST";
            }
        };
        Future<String> f = e.submit(call);
        String res = f.get();//阻塞直到结果返回
        System.out.println(res);

4. References

  1. http://www.baeldung.com/thread-pool-java-and-guava
  2. Java Conccurency in Practice
  3. http://www.blogjava.net/stevenjohn/archive/2011/12/12/366161.html