欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ThreadPoolExecutor基础使用

程序员文章站 2024-03-02 14:58:40
...

前言

Android开发中由于禁止在主线程中做网络请求,通常都需要使用线程对象来做异步请求操作,但是直接使用new Thread();创建新线程需要不停的申请系统资源,这些野生的线程缺乏统一管理,相互竞争占用过多系统资源;直接使用普通的线程做定期执行和线程中断等功能特别容易出错,为此 J.U.C类库里添加了功能强大的Executor框架,它能够为用户程序提供强大的线程池实现。使用JDK自带的线程池功能有如下的优点:

  1. 可以重用已有的线程,减少对象创建和销毁;
  2. 可以被有效控制最大并发线程数,提高系统资源利用率,避免过多资源竞争;
  3. 提供定时执行定期执行单线程和并发数据控制等功能

基础介绍

J.U.C的Excutor框架最主要的子类就是ThreadPoolExecutor,它是大部分线程池的底层实现对象,通常只需要学会如何配置它的运行参数就能够产生功能相当强大的线程池实现。它的构造函数主要包含如下几个参数:

参数名 作用
corePoolSize 核心线程数量
maximumPoolSize 线程最大线程数
workQueue 阻塞队列,存储等待执行的任务会对线程池线程运行过程产生重大影响
keepAliveTime 线程没有任务执行是最多保持多久时间终止
unit 时间单位
threadFactory 线程工厂,用来创建线程
rejectedExcutionHandler 如果提交的任务太多无法被处理,需要执行的拒绝策略

线程池合理配置,需要尽量压榨CPU,CPU密集型任务核心线程数参考值可以设置为NCPU+1,IO密集型任务参考值可以设置为2*NCPU

线程池内部使用状态机管理状态,主要分成运行、关闭、停止、清理、终止五种状态,它们之间的相互转换如下:
ThreadPoolExecutor基础使用
在关闭状态之前提交的任务会被继续执行完成,但新添加的任务会被抛弃,关闭状态会直接停止所有线程和正在执行的任务,清理完了任务和工作线程之后进入清理状态,最后完成终止回调进入终止状态,线程池就不能再使用了。

普通的Thread请求无法返回结果,而很多网络请求就是为了从服务器获取数据,直接使用Runnable就需要使用共享变量来接收返回结果,这种跨线程的数据请求很容易出现问题。Executor框架为用户提供了FutureTask类,它实现了RunnableFuture接口,而RunnableFuture实现了Runnable和Future两个接口,Future代表一个执行结果对象,调用它的get/cancel/isDown方法能够获取、取消和查看执行结果,传递进线程池的任务通常都会被封装成FutureTask对象来执行。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

上面的代码就是AbstractExecutorService几个需要返回结果的执行接口,它们都会把Runnable或者Callable对象封装到RunnableFuture对象中去,execute接口不会做任何封装,它只接受Runnable类型的任务对象。

正常执行测试

前面已经了解过线程池对象的几个参数,现在使用自定义生成的参数来配置一个线程池,测试它在不同情况下的执行效果。首先为线程池添加自定义的ThreadFactory对象,这个线程工厂每次产生新线程的时候都会打印创建线程的名字。接着定义当任务过多线程池无法接受过多任务会执行拒绝策略,这里只是简单的打印出被拒绝的任务。

ThreadFactory threadFactory = new ThreadFactory() {
    private AtomicLong mAtomicLong = new AtomicLong(0);

    @Override
    public Thread newThread(@NotNull Runnable r) {
        Thread thread = new Thread(r, "My-Thread-" + mAtomicLong.getAndIncrement());
        // 创建新线程的时候打印新线程名字
        System.out.println("Create new Thread(): " + thread.getName());
        return thread;
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 打印被拒绝的任务
        System.out.println("rejectedExecution" + r.toString());
    }
};

接着来为线程池配置任务队列,也就是在核心线程已经全部创建而且都在执行任务,这时又有新的任务被提交它们就会被放置在任务队列中,任务队列可以分为有界队列和*队列。

// 有界队列,最多能存放5个任务对象
BlockingQueue<Runnable> limitArray = new ArrayBlockingQueue<>(5);
BlockingQueue<Runnable> limitlist = new LinkedBlockingQueue<>(5);

// *队列可以存放Integer.MAX_VALUE个任务
BlockingQueue<Runnable> unlimitList = new LinkedBlockingQueue<>();

前面几个重要的参数都已经配置好,现在可以创建一个有3个核心线程,最多6个线程,线程保活时间30秒的线程池,使用它来提交不同数量的任务看它会有什么样的表现。

ExecutorService executorService = new ThreadPoolExecutor(3, 6,
                30, TimeUnit.SECONDS, limitArray, threadFactory, handler);

int size = 14; // 需要执行的任务数量
for (int i = 0; i < size; i++) {
    final int index = i;
    executorService.execute(new Runnable() {
        private int num = index;
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " -> " + "Current Task Num: #" + index);
        }

        @Override
        public String toString() {
            return "$classname{" +
                    "num=" + num +
                    '}';
        }
    });
}

executorService.shutdown();
try {
    executorService.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
    e.printStackTrace();
}

首先执行任务数小于核心线程数个的任务,查看任务的执行效果。

// size = 2;
Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
My-Thread-0 -> Current Task Num: #0
My-Thread-1 -> Current Task Num: #1

当少于核心线程数的时候只创建了2个核心线程,每个线程执行一个任务。

// size = 5
Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
My-Thread-0 -> Current Task Num: #0
My-Thread-2 -> Current Task Num: #2
My-Thread-1 -> Current Task Num: #1
// 过一段时间
My-Thread-0 -> Current Task Num: #3
My-Thread-2 -> Current Task Num: #4

当任务超过核心线程时会先创建核心线程并且为它们分配任务,多余的任务被放到任务队列中,在核心线程执行完之前的任务再执行任务队列中任务。

// size = 10
Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
Create new Thread(): My-Thread-3
Create new Thread(): My-Thread-4
My-Thread-2 -> Current Task Num: #2
My-Thread-0 -> Current Task Num: #0
My-Thread-1 -> Current Task Num: #1
My-Thread-4 -> Current Task Num: #9
My-Thread-3 -> Current Task Num: #8
My-Thread-2 -> Current Task Num: #4
My-Thread-1 -> Current Task Num: #5
My-Thread-0 -> Current Task Num: #3
My-Thread-3 -> Current Task Num: #7
My-Thread-4 -> Current Task Num: #6

由于核心线程有3个,任务队列中有5个位置可以放,而一次提交了10个任务,就还有2个任务没有地方去,就会检查最大线程数是6个,还有3个线程可以创建,这时会创建少于最大线程数减去核心线程数的线程处理多余任务。

// size = 12
Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
Create new Thread(): My-Thread-3
Create new Thread(): My-Thread-4
Create new Thread(): My-Thread-5
// 第11个任务被拒绝
rejectedExecution$classname{num=11}
My-Thread-3 -> Current Task Num: #8
My-Thread-2 -> Current Task Num: #2
My-Thread-1 -> Current Task Num: #1
My-Thread-0 -> Current Task Num: #0
My-Thread-5 -> Current Task Num: #10
My-Thread-4 -> Current Task Num: #9
My-Thread-1 -> Current Task Num: #3
My-Thread-0 -> Current Task Num: #6
My-Thread-2 -> Current Task Num: #5
My-Thread-3 -> Current Task Num: #4
My-Thread-5 -> Current Task Num: #7

当一次提交12个任务会先创建3个核心线程并处理3个任务,再将5个任务放到任务队列中,之后在创建3个线程使线程数达到最大线程6个,这时已经有11任务,最后一个任务被拒绝,之后非核心线程会在30秒后自动销毁退出。前面的测试都是针对有界任务队列而言的,如果是*任务队列按道理能够放下任意多个任务,永远不会创建非核心线程,现在使用*队列并提交多个任务看下执行效果。

// 任务队列换成*队列
ExecutorService executorService = new ThreadPoolExecutor(3, 6,
        30, TimeUnit.SECONDS, unlimitList, threadFactory, handler);

// size = 12
Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
My-Thread-0 -> Current Task Num: #0
My-Thread-2 -> Current Task Num: #2
My-Thread-1 -> Current Task Num: #1
My-Thread-0 -> Current Task Num: #3
My-Thread-1 -> Current Task Num: #5
My-Thread-2 -> Current Task Num: #4
My-Thread-0 -> Current Task Num: #6
My-Thread-2 -> Current Task Num: #8
My-Thread-1 -> Current Task Num: #7
My-Thread-0 -> Current Task Num: #9
My-Thread-2 -> Current Task Num: #10
My-Thread-1 -> Current Task Num: #11

异常执行测试

前面执行的任务都是正常执行正常结束,如果在执行任务的过程中出现抛出运行时异常这时线程池内部又会如何处理呢,这里来在第3个任务执行的时候让它抛出空指针异常,看看会发生什么事情。

ExecutorService executorService = new ThreadPoolExecutor(3, 6,
        30, TimeUnit.SECONDS, limitArray, threadFactory, handler);

int size = 9;
for (int i = 0; i < size; i++) {
    final int index = i;
    // 这里使用的是execute,线程池里直接执行的Runnable对象,
    // 如果使用submit,由于会使用FutureTask包装Runnable,抛出的异常不会影响执行线程
    executorService.execute(new Runnable() {
        private int num = index;
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 如果是第三个任务,抛出空指针异常
            if (index == 2) {
                throw new NullPointerException();
            }
            System.out.println(Thread.currentThread().getName() + " -> " + "Current Task Num: #" + index);
        }

        @Override
        public String toString() {
            return "$classname{" +
                    "num=" + num +
                    '}';
        }
    });
}

实际运行之后会发现第二个任务由于抛出的运行时异常导致运行它的线程同样被强制退出,随后线程池又添加了新的线程用来执行之后的任务。

Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
Create new Thread(): My-Thread-3
Exception in thread "My-Thread-2" java.lang.NullPointerException
My-Thread-1 -> Current Task Num: #1
My-Thread-0 -> Current Task Num: #0
    at com.example.ExceptionTest$3.run(ExceptionTest.java:46)
My-Thread-3 -> Current Task Num: #8
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
Create new Thread(): My-Thread-4
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
My-Thread-0 -> Current Task Num: #4
My-Thread-1 -> Current Task Num: #3
My-Thread-4 -> Current Task Num: #6
My-Thread-3 -> Current Task Num: #5
My-Thread-0 -> Current Task Num: #7

这里的execute由于是直接使用传入的Runnable对象来执行,出现异常之后会导致线程池中执行线程强制退出,如果使用submit那么传递的Callable或者Runnable对象会使用FutureTask包装,内部的任务如果抛出异常不会印象执行线程。

Create new Thread(): My-Thread-0
Create new Thread(): My-Thread-1
Create new Thread(): My-Thread-2
Create new Thread(): My-Thread-3
My-Thread-0 -> Current Task Num: #0
My-Thread-1 -> Current Task Num: #1
My-Thread-3 -> Current Task Num: #8
My-Thread-0 -> Current Task Num: #3
My-Thread-3 -> Current Task Num: #6
My-Thread-2 -> Current Task Num: #5
My-Thread-1 -> Current Task Num: #4
My-Thread-0 -> Current Task Num: #7
相关标签: ThreadPoolExecutor