欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

线程池介绍(一)—基本使用

程序员文章站 2024-03-02 14:45:28
...

在将线程池之前,首先得了解其重要的接口java.util.concurrent.Executor,线程池有关的大部分类都是实现此接口。

先看看Executor接口的相关继承结构。


线程池介绍(一)—基本使用

绿色虚线为接口实现关系

蓝色虚线箭头为继承关系

    可以知道ExecutorService是Executor的子接口,此时还是不能实例化,它的唯一实现类AbstractExecutorService,由于是抽样类,故还是不能实例化,而接下里的实现类ThreadPoolExecutor是可以实例化的。


ThreadPoolExecutor介绍:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)

参数如下:

  • corePoolSize(线程池的基本大小):池中所保存的线程数,包括空闲的线程数。每当有一个任务被提交到线程池中,它就会创建一个线程来执行,哪怕还有空闲的,只要不大于核心池大小都会创建。如果调用了prestartAllCoreThreads方法,线程池就会提前创建并启动所有的基本线程。
  • maximumPoolSize(线程池最大大小):池中允许最大的线程数。
  • keepAliveTime(线程活动保持时间):当线程数量大于corePoolSize这个值时,如果超过了此时间单位,则删除
  • TimeUnit(线程活动保持时间的单位):上一个参数的时间单位,天(DAYS)、小时(HOURS)等等。
  • workQueue(任务队列):执行前用于保持任务的队列,该队列仅由execute方法提交的Runnable任务
  • ThreadFactory:设置创建线程工厂,通过线程工厂创建出来的线程做有需要的事情,如设置优先级和名字
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,那么应该采取一种策略处理提交的新任务。


WorkQueue的阻塞队列:

         1.  ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

        2. LinkedBlockingQueue:是一个*的,无缓存, 基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。

         3. SynchronousQueue:一个*的,不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,可以看做是一个缓存值为1的阻塞队列。吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。有公平模式和非公平模式的区别。

          4.  PriorityBlockingQueue:一个具有优先级的无限阻塞队列


RejectedExecutionHandler(饱和策略):

           当队列和线程池都满了,说明线程池处于饱和状态。

  1. AbortPolicy:直接抛出异常。(默认)
  2. CallerRunsPolicy:只用调用者所在线程来运行任务。
  3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  4. DiscardPolicy:不处理,丢弃掉。
  5. 可以根据实际情况实现RejectedExecutionHandler接口自定义策略。

线程池工作方式(参数之间的关系):

1) A 代表当前运行的线程,即Executor提交Runnable的数量

2) B 代表corePoolSize

3)   C 代表maximumPoolSize

4) D 代表 A - B

5) E 代表LinkedBlockingQueue队列 ,这里是无参构造函数,说明的是大小没有限制,队列容量大。

6) F 代表SynchronousQueue队列

7) G 代表KeepAliceTimes

  • A <= B ,则Executor始终的选择添加新的线程,而不排队,其他参数忽略。
  • A > B , A <= C , E : C、G忽略,直接把D放在E中。
  • A > B , A <= C , F :   C、 G有效,马上创建这些线程运行任务,而不是把D放在F中,D执行完任务后在指定时间超过G则进行清除。
  • A > C , E :  C、G忽略,直接把D放入E中等待。
  • A > C,  F : 处理C的任务,其他任务不再处理抛出异常

总之:申请的小于core的,其他参数都忽略不计。会创建新的线程,既然池中有空闲的线程。

          申请的大于core且小于max的话如果用Linked..则此时运行的最大线程为core,其他的则在队列中等,由于池中运行的数量永远不会超过core,那么keeptime也没有意义了。如果用SynQueue的话,则不会放入队列中,而是直接在池中运行,此时此种运行的数量是超过core的,那么keeptime即有意义了,超过指定时间后将进行清除。

          申请的大于max的话。还是一样,如果使用linked.. 将大于的core的都放入队列中,池中永远最多能运行core个线程,即keeptime还是没用的。如果是SynQueue的话,那么池中运行的就是max个线程,超过max的线程将会默认抛出异常。

排队策略:

上述我一阵说的转成专业术语就叫排队策略了,通常是三种策略:

         1. 直接提交:工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。如果没有可以立即用的线程,它会构造个新的线程出来。

         2.*队列:不具有预定义容量的LinkedBlockingQueue,创建的线程就不会超过 corePoolSize。(max 的值也就无效了)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用*队列

         3.有界队列:当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。


如何用线程池:

简单的方法就是Executors,这个是工具类,(而ThreadPoolExecutor需要许多的参数)用于生成常见的线程池的方法:

  • 使用newCachedThreadPool()方法创建*线程池,使用的是SynchronousQueue这个队列。
public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }  
  • 使用newFixedThreadPool(int)方法创建有界线程池,使用的是LinkedBlockingQueue这个队列。
public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS, 
                                      new LinkedBlockingQueue<Runnable>());  
    }  
  • 使用newSingleThreadExecutor()方法创建单一线程池

1. *线程池,理论上是可以放Integer.max个的,会自动进行线程回收

public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();    
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    System.out.println("begin A" + System.currentTimeMillis());
                    Thread.sleep(5000);
                    System.out.println("A");
                    System.out.println("end A" + System.currentTimeMillis());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    System.out.println("begin B" + System.currentTimeMillis());
                    Thread.sleep(1000);
                    System.out.println("B");
                    System.out.println("end B" + System.currentTimeMillis());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        });

        //一次性用5个线程
        for(int i = 0; i < 5; i ++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run "  + Thread.currentThread().getName());
                }
            });
        }
    }

如何看复用了线程池的线程:

public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i ++){
            executorService.execute(new MyRunner("i"));
        }
        Thread.sleep(1000);
        System.out.println(" ");

        for(int i = 0; i < 5; i ++){
            executorService.execute(new MyRunner(Integer.toString(i)));
        }
    }

 class MyRunner implements Runnable{
    
    private String username;

    public MyRunner(String username){
        super();
        this.username = username;
    }

    @Override
    public void run() {
       System.out.println(Thread.currentThread().getName() + "username = " + username + "begin" + System.currentTimeMillis());
        //Thread.sleep(2000);
       System.out.println(Thread.currentThread().getName() + "username = " + username + "end" + System.currentTimeMillis());
    }
}

*线程池还有一种构造方法,加ThreadFactory定制工厂,对线程进行修改,如该线程名字之类的。

public static void main(String[] args) {
        MyThreadFactry myThreadFactry = new MyThreadFactry();
        ExecutorService executorService = Executors.newCachedThreadPool(myThreadFactry);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
    }

class MyThreadFactry implements ThreadFactory{

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("池子中线程的名称 " + Math.random());
        return thread;
    }
}

2. 有界线程池:可以指定池子里有多少个最大的数量

    此处定义了4个任务,只有2个线程的线程池

public static void main(String[] args) {
        MyThreadFactry myThreadFactry = new MyThreadFactry();
        ExecutorService executorService = Executors.newFixedThreadPool(2, myThreadFactry);
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try{
                    System.out.println("begin" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
                    Thread.sleep(2000);
                    System.out.println("end" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
                }catch (InterruptedException e ){
                    e.printStackTrace();
                }
            }
        };
        executorService.execute(run);
        executorService.execute(run);
        executorService.execute(run);
        executorService.execute(run);
    }

class MythreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setName("池子的线程名称" + Math.random());
         return thread;
    }
}

3. 单一线程池:就只有一个线程在执行

        创建3个任务,并命名,放入线程池中,可以看得到就只有一个线程在运行

public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 3; i++){
            executorService.execute(new MyRunner(Integer.toString(i)));
        }
    }

class MyThread implements Runnable{

    private String username;

    public MyThread(String username){
        super();
        this.username = username;
    }

    @Override
    public void run() {
        try {
           System.out.println(Thread.currentThread().getName() + ": begin " + System.currentTimeMillis() +
                 "username:" + username);
            Thread.sleep(1000);
           System.out.println(Thread.currentThread().getName() + ": end " + System.currentTimeMillis() + 
                "username:" + username);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}