欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java 线程池详解

程序员文章站 2024-03-11 11:16:31
系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致jvm崩溃,而线程...

系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致jvm崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数。

一、executors 工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池。创建的线程池是一个executorservice对象,使用该对象的submit方法或者是execute方法执行相应的runnable或者是callable任务。线程池本身在不再需要的时候调用shutdown()方法停止线程池,调用该方法后,该线程池将不再允许任务添加进来,但是会直到已添加的所有任务执行完成后才死亡。

1、newcachedthreadpool(),创建一个具有缓存功能的线程池,提交到该线程池的任务(runnable或callable对象)创建的线程,如果执行完成,会被缓存到cachedthreadpool中,供后面需要执行的任务使用。

import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

public class cachethreadpool {
  static class task implements runnable {
    @override
    public void run() {
      system.out.println(this + " " + thread.currentthread().getname() + " allstacktraces map size: "
          + thread.currentthread().getallstacktraces().size());
    }
  }

  public static void main(string[] args) {
    executorservice cachethreadpool = executors.newcachedthreadpool();
    
    //先添加三个任务到线程池
    for(int i = 0 ; i < 3; i++) {
      cachethreadpool.execute(new task());
    }
    
    //等三个线程执行完成后,再次添加三个任务到线程池
    try {
      thread.sleep(3000);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }
    
    for(int i = 0 ; i < 3; i++) {
      cachethreadpool.execute(new task());
    }
  }

}

执行结果如下:

cachethreadpool$task@2d312eb9 pool-1-thread-1 allstacktraces map size: 7
cachethreadpool$task@59522b86 pool-1-thread-3 allstacktraces map size: 7
cachethreadpool$task@73dbb89f pool-1-thread-2 allstacktraces map size: 7
cachethreadpool$task@5795cedc pool-1-thread-3 allstacktraces map size: 7
cachethreadpool$task@256d5600 pool-1-thread-1 allstacktraces map size: 7
cachethreadpool$task@7d1c5894 pool-1-thread-2 allstacktraces map size: 7

线程池中的线程对象进行了缓存,当有新任务执行时进行了复用。但是如果有特别多的并发时,缓存线程池还是会创建很多个线程对象。

2、newfixedthreadpool(int nthreads) 创建一个指定线程个数,线程可复用的线程池。

import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

public class fixedthreadpool {
  static class task implements runnable {
    @override
    public void run() {
      system.out.println(this + " " + thread.currentthread().getname() + " allstacktraces map size: "
          + thread.currentthread().getallstacktraces().size());
    }
  }

  public static void main(string[] args) {
    executorservice fixedthreadpool = executors.newfixedthreadpool(3);

    // 先添加三个任务到线程池
    for (int i = 0; i < 5; i++) {
      fixedthreadpool.execute(new task());
    }

    // 等三个线程执行完成后,再次添加三个任务到线程池
    try {
      thread.sleep(3);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }

    for (int i = 0; i < 3; i++) {
      fixedthreadpool.execute(new task());
    }
  }

}

执行结果:

fixedthreadpool$task@7045c12d pool-1-thread-2 allstacktraces map size: 7
fixedthreadpool$task@50fa0bef pool-1-thread-2 allstacktraces map size: 7
fixedthreadpool$task@ccb1870 pool-1-thread-2 allstacktraces map size: 7
fixedthreadpool$task@7392b4e3 pool-1-thread-1 allstacktraces map size: 7
fixedthreadpool$task@5bdeff18 pool-1-thread-2 allstacktraces map size: 7
fixedthreadpool$task@7d5554e1 pool-1-thread-1 allstacktraces map size: 7
fixedthreadpool$task@24468092 pool-1-thread-3 allstacktraces map size: 7
fixedthreadpool$task@fa7b978 pool-1-thread-2 allstacktraces map size: 7

3、newsinglethreadexecutor(),创建一个只有单线程的线程池,相当于调用newfixedthreadpool(1)

4、newsheduledthreadpool(int corepoolsize),创建指定线程数的线程池,它可以在指定延迟后执行线程。也可以以某一周期重复执行某一线程,知道调用shutdown()关闭线程池。

示例如下:

import java.util.concurrent.executors;
import java.util.concurrent.scheduledexecutorservice;
import java.util.concurrent.timeunit;

public class scheduledthreadpool {
  static class task implements runnable {
    @override
    public void run() {
      system.out.println("time " + system.currenttimemillis() + " " + thread.currentthread().getname() + " allstacktraces map size: "
          + thread.currentthread().getallstacktraces().size());
    }
  }

  public static void main(string[] args) {
    scheduledexecutorservice scheduledexecutorservice = executors.newscheduledthreadpool(3);
    
    scheduledexecutorservice.schedule(new task(), 3, timeunit.seconds);
    
    scheduledexecutorservice.scheduleatfixedrate(new task(), 3, 5, timeunit.seconds);
  
    try {
      thread.sleep(30 * 1000);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }
    scheduledexecutorservice.shutdown();
  }

}

运行结果如下:

time 1458921795240 pool-1-thread-1 allstacktraces map size: 6
time 1458921795241 pool-1-thread-2 allstacktraces map size: 6
time 1458921800240 pool-1-thread-1 allstacktraces map size: 7
time 1458921805240 pool-1-thread-1 allstacktraces map size: 7
time 1458921810240 pool-1-thread-1 allstacktraces map size: 7
time 1458921815240 pool-1-thread-1 allstacktraces map size: 7
time 1458921820240 pool-1-thread-1 allstacktraces map size: 7

由运行时间可看出,任务是按照5秒的周期执行的。

5、newsinglethreadscheduledexecutor() 创建一个只有一个线程的线程池,同调用newscheduledthreadpool(1)。

二、forkjoinpool和forkjointask

forkjoinpool是executorservice的实现类,支持将一个任务划分为多个小任务并行计算,在把多个小任务的计算结果合并成总的计算结果。它有两个构造函数

forkjoinpool(int parallelism)创建一个包含parallelism个并行线程的forkjoinpool。

forkjoinpool(),以runtime.availableprocessors()方法返回值作为parallelism参数来创建forkjoinpool。

forkjointask 代表一个可以并行,合并的任务。它是实现了future<t>接口的抽象类,它有两个抽象子类,代表无返回值任务的recuriveaction和有返回值的recursivetask。可根据具体需求继承这两个抽象类实现自己的对象,然后调用forkjoinpool的submit 方法执行。

recuriveaction 示例如下,实现并行输出0-300的数字。

import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursiveaction;
import java.util.concurrent.timeunit;

public class actionforkjointask {
  static class printtask extends recursiveaction {
    private static final int threshold = 50;
    private int start;
    private int end;

    public printtask(int start, int end) {
      this.start = start;
      this.end = end;
    }

    @override
    protected void compute() {
      if (end - start < threshold) {
        for(int i = start; i < end; i++) {
          system.out.println(thread.currentthread().getname() + " " + i);
        }
      } else {
        int middle = (start + end) / 2;
        printtask left = new printtask(start, middle);
        printtask right = new printtask(middle, end);
        left.fork();
        right.fork();
      }
    }

  }

  public static void main(string[] args) {
    forkjoinpool pool = new forkjoinpool();
    
    pool.submit(new printtask(0, 300));
    try {
      pool.awaittermination(2, timeunit.seconds);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }
    
    pool.shutdown();
  }

}

在拆分小任务后,调用任务的fork()方法,加入到forkjoinpool中并行执行。

recursivetask示例,实现并行计算100个整数求和。拆分为每20个数求和后获取结果,在最后合并为最后的结果。

import java.util.random;
import java.util.concurrent.executionexception;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.future;
import java.util.concurrent.recursivetask;

public class taskforkjointask {
  static class caltask extends recursivetask<integer> {
    private static final int threshold = 20;

    private int arr[];
    private int start;
    private int end;

    public caltask(int[] arr, int start, int end) {
      this.arr = arr;
      this.start = start;
      this.end = end;
    }

    @override
    protected integer compute() {
      int sum = 0;

      if (end - start < threshold) {
        for (int i = start; i < end; i++) {
          sum += arr[i];
        }
        system.out.println(thread.currentthread().getname() + " sum:" + sum);
        return sum;
      } else {
        int middle = (start + end) / 2;
        caltask left = new caltask(arr, start, middle);
        caltask right = new caltask(arr, middle, end);

        left.fork();
        right.fork();

        return left.join() + right.join();
      }
    }

  }

  public static void main(string[] args) {
    int arr[] = new int[100];
    random random = new random();
    int total = 0;

    for (int i = 0; i < arr.length; i++) {
      int tmp = random.nextint(20);
      total += (arr[i] = tmp);
    }
    system.out.println("total " + total);

    forkjoinpool pool = new forkjoinpool(4);

    future<integer> future = pool.submit(new caltask(arr, 0, arr.length));
    try {
      system.out.println("cal result: " + future.get());
    } catch (interruptedexception e) {
      e.printstacktrace();
    } catch (executionexception e) {
      e.printstacktrace();
    }
    pool.shutdown();
  }

}

执行结果如下:

total 912
forkjoinpool-1-worker-2 sum:82
forkjoinpool-1-worker-2 sum:123
forkjoinpool-1-worker-2 sum:144
forkjoinpool-1-worker-3 sum:119
forkjoinpool-1-worker-2 sum:106
forkjoinpool-1-worker-2 sum:128
forkjoinpool-1-worker-2 sum:121
forkjoinpool-1-worker-3 sum:89
cal result: 912

子任务执行完后,调用任务的join()方法获取子任务执行结果,再相加获得最后的结果。