欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

线程池4种常用方式实现以及自定义线程池原理

程序员文章站 2022-05-16 14:35:39
...

1.什么是线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序
都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。 当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。 线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用

2.常用的4种线程池

2.1分类

(1)newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
(2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
(3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
(4)newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

2.2实现

2.2.1 newCachedThreadPool

public class NewCacheThreadPoolTest {
    public static void main(String[] args) {
        // 无限大小线程池 jvm自动回收
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(50);
                    } catch (Exception e) {

                    }
                    System.out.println(Thread.currentThread().getName() + ",i:" + temp);

                }
            });
        }
        //关闭线程池
        newCachedThreadPool.shutdown();
    }
}

2.2.2 newFixedThreadPool

public class NewFixedThreadPoolTest {
    public static void main(String[] args) {
        // 固定大小的线程数
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newFixedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(50);
                    } catch (Exception e) {

                    }
                    System.out.println(Thread.currentThread().getName() + ",i:" + temp);

                }
            });
        }
        newFixedThreadPool.shutdown();

    }
}

**2.2.3 newScheduledThreadPool **

public class NewScheduledThreadPoolTest {
    public static void main(String[] args) {
        // 固定大小的线程数
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newScheduledThreadPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "---i:" + temp);
                }
            }, 3, TimeUnit.SECONDS);
        }
        newScheduledThreadPool.shutdown();

    }
}

2.2.4 newSingleThreadExecutor

public class NewSingleThreadExecutorTest {
    public static void main(String[] args) {
        //单线程,主要用于依赖上一个结果的场景
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int temp = i;
            newSingleThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "---i:" + temp);
                }
            });
        }
        newSingleThreadExecutor.shutdown();
    }
}

3.自定义线程池

阿里的 Java开发手册,上面有线程池的一个建议:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

点击ThreadPoolExecutor 源码

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

corePoolSize - 线程池核心池的大小。
maximumPoolSize - 线程池的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 的时间单位。
workQueue - 用来储存等待执行任务的队列。
threadFactory - 线程工厂。
handler - 拒绝策略。

关注点1 线程池大小
线程池有两个线程数的设置,一个为核心池线程数,一个为最大线程数。
在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。

关注点2 适当的阻塞队列
java.lang.IllegalStateException: Queue full
方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列

PriorityBlockingQueue :一个支持优先级排序的*阻塞队列。
DelayQueue: 一个使用优先级队列实现的*阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的*阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。

关注点3 明确拒绝策略
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛RejectedExecutionException异常。 (默认)
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

自定义线程池原理图

线程池4种常用方式实现以及自定义线程池原理

实现实例

**
 * @author Administrator
 */
public class MyThreadPool {
    public static void main(String[] args) {
        /**
         * corePoolSize核心线程数 1
         * maximumPoolSize最大线程数 2
         * keepAliveTime保持活跃时间 0
         * capacity有界队列的大小 3
         */
        ThreadPoolExecutor threadPoolExecutor = new
                ThreadPoolExecutor(1,
                2,
                0L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3));
        /**
         * 任务1不小于核心线程数,创建线程执行
         */
        threadPoolExecutor.execute(new ThreadTest("任务1"));
        /**
         * 任务2大于核心线程数,但是队列未满,入列,队列为1
         */
        threadPoolExecutor.execute(new ThreadTest("任务2"));
        /**
         * 任务3大于核心线程数,但是队列未满,入列,队列为2
         */
        threadPoolExecutor.execute(new ThreadTest("任务3"));
        /**
         * 任务4大于核心线程数,但是队列未满,入列,队列为3,此时队列已满
         */
        threadPoolExecutor.execute(new ThreadTest("任务4"));
        /**
         * 任务5队列已满,但是没有大于最大线程数,重新创建线程执行
         */
        threadPoolExecutor.execute(new ThreadTest("任务5"));
        /**
         * 任务6队列已满,大于了最大线程数,拒绝任务执行
         */
        threadPoolExecutor.execute(new ThreadTest("任务6"));
        threadPoolExecutor.shutdown();
    }

}

class ThreadTest implements Runnable {
    private String threadName;

    public ThreadTest(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public void run() {
        System.out.println(threadName);
    }
}

四、合理配置线程池的大小

遵循两原则:
1、如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 CPU N+1
2、如果是IO密集型任务,参考值可以设置为2*N CPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。