欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

线程池ThreadPoolExecutor实现原理

程序员文章站 2022-11-11 13:23:59
线程属于稀缺资源,对于线程的创建规则,引用《阿里巴巴 Java 手册》中的一条进行说明。 本篇从源码方面介绍ThreadPoolExecutor对象,并简要解析线程池工作原理。 首先ThreadPoolExecutor中定义了几个线程池状态常量。 RUNNING是运行状态,线程池可以接收新任务 SH ......

  线程属于稀缺资源,对于线程的创建规则,引用《阿里巴巴 java 手册》中的一条进行说明。

线程池ThreadPoolExecutor实现原理

 

 

 

 

  本篇从源码方面介绍threadpoolexecutor对象,并简要解析线程池工作原理。

  首先threadpoolexecutor中定义了几个线程池状态常量

// runstate is stored in the high-order bits
    private static final int running    = -1 << count_bits;
    private static final int shutdown   =  0 << count_bits;
    private static final int stop       =  1 << count_bits;
    private static final int tidying    =  2 << count_bits;
    private static final int terminated =  3 << count_bits;
  • running是运行状态,线程池可以接收新任务
  • shutdown是在调用shutdown()方法以后处在的状态。表示不再接收新任务,但队列中的任务可以执行完毕
  • stop是在调用shutdownnow()方法以后的状态。不再接收新任务,中断正在执行的任务,抛弃队列中的任务
  • tidying表示所有任务都执行完毕
  • terminated为中止状态,调用terminated()方法后,尝试更新为此状态

  看一下构造方法的参数

public threadpoolexecutor(int corepoolsize,int maximumpoolsize,long keepalivetime,timeunit unit,blockingqueue<runnable> workqueue,
                              threadfactory threadfactory, rejectedexecutionhandler handler) {        
    }
  • corepoolsize是线程池的核心线程数
  • maximumpoolsize是线程池允许的最大线程数
  • keepalivetime为线程空闲时的存活时间
  • unit是keepalivetime的单位
  • workqueue是用来保存等待被执行的线程的队列
  • threadfactory,线程工厂,通常使用默认工厂,定义了线程名称生成规则。
  • handler是当最大线程数和队列都满了以后,线程池的处理策略

  这里可以简单看下jdk中已有的4种饱和策略handler:

  • abortpolicy:默认策略,直接抛出异常,throw new rejectedexecutionexception
  • callerrunspolicy:如果线程池没有中止,直接用调用者的线程资源执行任务
  • discardoldestpolicy:如果线程池没有中止,移除队列第一个任务,再执行当前任务
  • discardpolicy:什么也不做,直接抛弃这个任务

  也可以自定义饱和策略,只需实现rejectedexecutionhandler接口。

  

  再看一下jdk中的workqueue队列:

  • arrayblockingqueue:基于数组实现
  • linkedblockingqueue:基于链表实现,默认容量integer.max_value
  • synchronousquene:不存储元素,每个插入操作必须等待另一个线程调用移除操作
  • priorityblockingquene:具有优先级的队列

 

  jdk提供了executors工厂类。这里面有几种实例化线程池的方法:

public static executorservice newfixedthreadpool(int nthreads) {
        return new threadpoolexecutor(nthreads, nthreads,
                                      0l, timeunit.milliseconds,
                                      new linkedblockingqueue<runnable>());
    }

  指定了线程数,且corepoolsize == maximumpoolsize。

 

public static executorservice newcachedthreadpool() {
        return new threadpoolexecutor(0, integer.max_value,
                                      60l, timeunit.seconds,
                                      new synchronousqueue<runnable>());
    }

  可缓存线程池。最大线程数达到integer.max_value。所以在使用该线程池时,一定要控制并发数,不然会创建很多线程,占用大量资源。

 

public static executorservice newsinglethreadexecutor() {
        return new finalizabledelegatedexecutorservice
            (new threadpoolexecutor(1, 1,
                                    0l, timeunit.milliseconds,
                                    new linkedblockingqueue<runnable>()));
    }

  只有一个线程的线程池,可以保证提交任务的顺序执行。

 

  上面的实例方法都很方便,但是在开发中创建线程池时,最好不好使用executors类中的初始化方法。见《阿里巴巴 java 手册》:

  线程池ThreadPoolExecutor实现原理

  

  最后看一下threadpoolexecutor执行任务的方法

public void execute(runnable command) {
        if (command == null)
            throw new nullpointerexception();
        int c = ctl.get();
        if (workercountof(c) < corepoolsize) {
            if (addworker(command, true))
                return;
            c = ctl.get();
        }
        if (isrunning(c) && workqueue.offer(command)) {
            int recheck = ctl.get();
            if (! isrunning(recheck) && remove(command))
                reject(command);
            else if (workercountof(recheck) == 0)
                addworker(null, false);
        }
        else if (!addworker(command, false))
            reject(command);
    }
  • 判断command是否为空
  • 获取线程状态
  • 计算线程池中的线程数量,如果数量小于corepoolsize,就创建一个新线程执行任务
  • 如果线程池正在运行状态,且写入队列成功。
  • 再次获取线程池状态。判断,如果线程状态变成了非运行状态,就从队列中移除任务,调用reject()方法执行饱和策略handler
  • 如果线程池为空,就创建一个新线程执行任务
  • 如果第4步判断没有通过,尝试建立线程执行任务,若没有成功,就执行饱和策略handler

  以一张简单的流程图描述上面的步骤:

  线程池ThreadPoolExecutor实现原理