欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

线程池的选用与线程数的指定

程序员文章站 2022-03-04 10:51:26
线程池的选用与线程数的指定 1、选用的两个角度 高性能:将提交到线程池中的任务直接交给线程去处理(前提:线程数小于最大线程数),不入队 缓冲执行:希望提交到线程池的任务尽量被核...

线程池的选用与线程数的指定

1、选用的两个角度

高性能:将提交到线程池中的任务直接交给线程去处理(前提:线程数小于最大线程数),不入队 缓冲执行:希望提交到线程池的任务尽量被核心线程(corepoolsize)执行掉

2、高性能

队列:synchronousqueue 最大线程数:一般设为integer.max_value(整数最大值),防止回绝任务 典型案例:newcachedthreadpool 尤其适合于执行耗时短的任务

注意:

设置好闲置失效时间,keepalivetime,用于避免资源大量耗费 对于出现大量耗时长的任务,容易造成线程数迅速增加,这种情况要衡量使用该类线程池是否合适

3、缓冲执行

队列:linkedblockingqueue和arrayblockingqueue 典型案例:newfixedthreadpool(int threadsize)

注意:

使用该类线程池,最好使用linkedblockingqueue(*队列),但是当大量并发任务的涌入,导致核心线程处理不过来,队列元素会大量增加,可能会报内存溢出 当然,对于上边这种情况的话,如果是arrayblockingqueue的话,如果设置得当,可以回绝一些任务,而不报内存溢出

4、线程数的确定

公式:启动线程数=[任务执行时间/(任务执行时间-io等待时间)]*cpu核数

注意:

如果任务大都是cpu计算型任务,启动线程数=cpu核数 如果任务大多需要等待磁盘操作,网络响应,启动线程数可以参照公式估算,当然>cpu核数

总结:

一般使用线程池,按照如下顺序依次考虑(只有前者不满足场景需求,才考虑后者):

newcachedthreadpool-->newfixedthreadpool(int threadsize)-->threadpoolexecutor

newcachedthreadpool不需要指定任何参数 newfixedthreadpool需要指定线程池数(核心线程数==最大线程数) threadpoolexecutor需要指定核心线程数、最大线程数、闲置超时时间、队列、队列容量,甚至还有回绝策略和线程工厂

对于:newfixedthreadpool和threadpoolexecutor的核心数可以参照上述给出的公式进行估算。

本文主要讲一下synchronousqueue。

定义

synchronousqueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。

如果以洗盘子的比喻为例,那么这就相当于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工作单元可以交付之前,必须通过串行方式首先完成入列[enqueue]或者出列[dequeue]等操作。)

直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列——这种区别就好比将文件直接交给同事,还是将文件放到她的邮箱中并希望她能尽快拿到文件。

因为synchronousqueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

实例

public class synchronousqueueexample {

    static class synchronousqueueproducer implements runnable {

        protected blockingqueue blockingqueue;
        final random random = new random();

        public synchronousqueueproducer(blockingqueue queue) {
            this.blockingqueue = queue;
        }

        @override
        public void run() {
            while (true) {
                try {
                    string data = uuid.randomuuid().tostring();
                    system.out.println("put: " + data);
                    blockingqueue.put(data);
                    thread.sleep(1000);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
            }
        }

    }

    static class synchronousqueueconsumer implements runnable {

        protected blockingqueue blockingqueue;

        public synchronousqueueconsumer(blockingqueue queue) {
            this.blockingqueue = queue;
        }

        @override
        public void run() {
            while (true) {
                try {
                    string data = blockingqueue.take();
                    system.out.println(thread.currentthread().getname()
                            + " take(): " + data);
                    thread.sleep(2000);
                } catch (interruptedexception e) {
                    e.printstacktrace();
                }
            }
        }

    }

    public static void main(string[] args) {
        final blockingqueue synchronousqueue = new synchronousqueue();

        synchronousqueueproducer queueproducer = new synchronousqueueproducer(
                synchronousqueue);
        new thread(queueproducer).start();

        synchronousqueueconsumer queueconsumer1 = new synchronousqueueconsumer(
                synchronousqueue);
        new thread(queueconsumer1).start();

        synchronousqueueconsumer queueconsumer2 = new synchronousqueueconsumer(
                synchronousqueue);
        new thread(queueconsumer2).start();

    }
}

插入数据的线程和获取数据的线程,交替执行

应用场景

executors.newcachedthreadpool()

 /**
     * creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * threadfactory to create new threads when needed.
     * @param threadfactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws nullpointerexception if threadfactory is null
     */
    public static executorservice newcachedthreadpool(threadfactory threadfactory) {
        return new threadpoolexecutor(0, integer.max_value,
                                      60l, timeunit.seconds,
                                      new synchronousqueue(),
                                      threadfactory);
    }

由于threadpoolexecutor内部实现任务提交的时候调用的是工作队列(blockingqueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用synchronousqueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从synchronousqueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,threadpoolexecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。

所以,使用synchronousqueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是integer.max_value,否则可能导致线程池中的工作者线程的数量一直增加到资源所无法承受为止。

如果应用程序确实需要比较大的工作队列容量,而又想避免*工作队列可能导致的问题,不妨考虑synchronousqueue。synchronousqueue实现上并不使用缓存空间。

使用synchronousqueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

doc

a guide to java synchronousqueue synchronousqueue example in java - produer consumer solution java synchronousqueue