欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

多线程三(线程组和线程池)

程序员文章站 2022-05-11 10:09:59
线程组和线程池 一. 线程组 1. 线程组介绍及使用 Java使用ThreadGroup来表示线程组,它可以对一批线程进行分类管理,Java允许直接对线程组进行控制。对线程组的控制相当于控制这批线程。 在默认情况下,子线程和创建它的父线程同属于一个线程组。 一旦线程假如某个线程组之后,该线程将一直属 ......

线程组和线程池

一. 线程组

1. 线程组介绍及使用

Java使用ThreadGroup来表示线程组,它可以对一批线程进行分类管理,Java允许直接对线程组进行控制。对线程组的控制相当于控制这批线程。

在默认情况下,子线程和创建它的父线程同属于一个线程组。

一旦线程假如某个线程组之后,该线程将一直属于该线程组,知道该线程死亡,线程运行途中不能改变它所属的线程组。

Thread提供了不同构造器设置新创建的线程属于哪个线程组。提供getThreadGroup()方法返回该线程所属的线程组对象。

ThreadGroup类提供了如下两个构造器创建实例。

  • ThreadGroup(String name):以指定的线程组名字来创建新的线程组
  • ThreadGroup(ThreadGroup parent,String name):以指定的名字、指定的父线程组创建一个新线程组

Java程序不允许改线程组名字,通过getName()方法获取线程组名字。

ThreadGroup类提供了如下常用的方法

  • int activeCount():返回此线程组中活动的线程数目
  • interrupt():中断此线程组中的所有线程
  • isDaemon():判断该线程组是否是后台线程组
  • setDaemon(boolean daemon):把该线程组设置成后台线程组。
  • setMaxPriority(int pri):设置线程组的最高优先级。

2.线程组和异常处理机制

从Java 5开始,Java加强了线程的异常处理,如果线程执行过程中抛出了一个未处理异常,JVM在结束该线程之前会自动查找是否有对应的Thread.UncaughtExceptionHandler对象,如果找到该处理器对象,则会调用该对象的uncaughtException(Thread t,Throwable e)方法来处理该异常。

Thread类提供了两个方法设置异常处理器。

  • static setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为该线程类的所有线程实例设置默认的异常处理器
  • setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh):为指定的线程实例设置异常处理器

ThreadGroup类实现了Thread.UncaughtExceptionHandler接口,所以每个线程所属的线程组将会作为默认的异常处理器。

如果线程执行过程中抛出了一个未处理异常,JVM在结束该线程之前会自动查找是否有对应的Thread.UncaughtExceptionHandler对象,如果找到该处理器对象,则会调用该对象的uncaughtException(Thread t,Throwable e)方法来处理该异常;否则,JVM会调用该线程所属的线程组对象的uncaughtException()方法来处理该异常。

线程组处理异常的流程如下:

  • 如果该线程组有父线程组,则调父线程组的uncaughtException()方法来处理该异常。
  • 如果该线线程实例所属的线程类有默认的异常处理器,那么调用该异常处理器来处理异常
  • 如果该对象是ThreadDeath对象,则不做任何处理;否则,将异常跟踪栈的信息打印到System.err错误输出流,并结束该线程。

下面主程序设置了异常处理器。

package com.gdut.thread;

class MyExHandler implements Thread.UncaughtExceptionHandler{
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println(t+"线程出现了异常"+e);
    }
}
public class ExHandler {
    public static void main(String[] args) {
        Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
        int a=5/0;
        System.out.println("程序正常结束");
    }
}

输出:Thread[main,5,main]线程出现了异常java.lang.ArithmeticException: / by zero

二. 线程池

系统启动一个新线程的成本是非常高的,因为它涉及与操作系统交互。当程序中需要创建大量生存期很短暂的线程时,应该考虑使用线程池来提高系统性能。

与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,当序将一个Runnable对象或Callable对象创给线程池,线程池就会启动一个线程来执行他们的run()或call()方法,当run()或call()方法执行结束后,该线程并不会死亡,而是再次返回线程池中称为空闲状态,等待执行下一个Runnable对象的run()或call()方法。

除此之外,使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。

2.1 Java 8改进的线程池

从Java 5开始,Java内建支持线程池。Java 5新增了一个Executors工厂类来生产线程池,包含如下静态方法生产不同的线程池。

  • newCachedThreadPool():创建一个具有缓存功能的线程池,系统根据需要创建线程,这些线程将被还存在线程池中
  • newFixdThreadPool(int nThread):创建一个可重用的,具有固定线程数的线程池
  • newSingleThreadExecutor():创建一个只有单线程的线程池,相当于newFixdThreadPool(int nThread)传入参数1。
  • newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。corePoolSize指池中所保存的线程数,即使线程是空闲的也被保存在线程池内
  • newSingleThreadledExecutor():创建一个线程的线程池,它可以在指定延迟后执行线程任务。
  • ExecutorService newWorkStealingPool(int parallelism):创建持有足够线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争。
  • ExecutorService newWorkStealingPool():该方法是前一个方法的简化版,他根据当前机器的CPU核数设置并行级别。

上面7个方法中的前三个方法返回一个ExecutorService对象,该对象代表一个线程池,它可以执行Runnable对象和Callable对象所代表的线程,中间两个方法返回一个ScheduledExecutorService线程池,它是ExecutorService的子类,它可以在指定延迟后执行线程任务,最后两个方法是Java 8新增的,他充分利用多CPU的并行能力。这两个方法生成的work stealing池,都相当于后台线程池,如果所有的前台线程都死亡了,work stealing池中的线程也会自动死亡。

ExecutorService对象的submit()方法可以执行Runnable对象或C对象代表的任务,返回Future对象,该对象代表call()方法的返回值。

ScheduledExecutorServic代表指定延迟后或周期性的执行任务的线程池,它提供了如下四个方法

  • ScheduledFuture<V> schedule(Callable<V> callble,long delay,TimeUnit unit):指定callable任务将在delay延迟后执行
  • ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任务将在delay延迟后执行
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):指定command任务将在delay延迟后以设定频率重复执行,也就是说,在initialDelay后开始执行,依次在initialDelay+period、initialDelay+period*2...重复执行。
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):创建并执行一个给定初始延迟后首次启动的定期操作,以后每一次执行终止和下一次执行的开始之间都存在给定的延迟。如果任务在某一次执行时遇到异常,就会取消后续执行;否则,只能通过程序显式取消或终止该任务。

用完一个线程池后,应该调用线程池的shutdown()方法启动线程池的关闭序列,调用shutdown()方法后的线程池不再接受任务,但会将以前所有已提交任务执行完成;另外也可以调用shutdownNow()方法关闭线程池,该方法试图停止所有在执行的活动任务,暂停处理正在等待执行的任务,并返回等待执行的任务列表。

使用线程池的任务步骤如下:

  1. 调用Executors类的静态工厂方法创建一个ExecutorService对象,该对象代表一个线程池
  2. 创建Runnable实现类或Callable实现类的实例,作为线程执行任务
  3. 调用ExecutorService对象的submit()方法来提交Runnable或Callable的实例代表的任务
  4. 当不想提交任何任务时,调用ExecutorService对象的shutdown()方法来关闭线程池。

实例如下:

package com.gdut.thread.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(6);
        Runnable runnable1 = ()->{
            for (int i = 0; i < 100; i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
        };
        Runnable runnable2 = ()->{
            for (char i = 'a'; i < 'z'; i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
        };
        pool.submit(runnable1);
        pool.submit(runnable2);
        pool.submit(runnable2);
        pool.shutdown();
    }
}

 

2.2 Java 8 增强的线程池

为了充分利用多CPU的优势、多核CPU的性能优势。可以考多个小任务,把小任务放到多个处理器核心上并行执行;当多个小任务执行完成之后,再将这些执行结果合并起来即可。Java 7提供了ForkJoinPool来支持这个功能。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。提供了如下两个常用的构造器

  • ForkJoinPool(int parallelism):创建一个包含parallelism个并行线程的ForkJoinPool.
  • ForkJoinPool():以Runtime.availableProssesors()方法的返回值作为paralelism参数来创建ForkJoinPool.

Java 8进一步拓展了ForkJoinPool的功能,Java 8增加了通用池功能。ForkJoinPool通过如下两个方法提供通用池功能。

  • ForkJoinPool commonPool():该方法返回一个通用池,通用池的状态不会受shutdown()或shutdownNow()方法的影响。
  • int getCommonPoolParallelism():该方法返回通用池的并行级别。

创建了通用池ForkJoinPool实例之后,就可调用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法来执行指定任务了。其中,ForkJoinTask代表一个并行,合并的任务。

ForkJoinTask是一个抽象类,它还有两个抽象子类:RecursiveAction和recursiveTask。其中RecursiveAction代表没有返回值的任务,RecursiveTask代表有返回值的任务。

下面程序将一个大任务(打印0~500)的数值分成多个小任务,并将任务交给ForkJoinPool来执行。

package com.gdut.thread.threadPool;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class PrintTask extends RecursiveAction{

    private static final int THRESHOLD = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end) {
      this.start = start;
      this.end = end;
    }

    @Override
    protected void compute() {
        if(end-start<THRESHOLD){
            for (int i = start; i <end ; i++) {
                System.out.println(Thread.currentThread().getName()+"的i值"+i);
            }
        }else{
            //当end与start的差大于THRESHOLD时,即要打印的数超过50时,将大任务分成两个小任务
            int middle = (end+start)/2;
            PrintTask left = new PrintTask(start,middle);
            PrintTask right = new PrintTask(middle,end);
            left.fork();
            right.fork();
        }
    }
}
public class ForkJoinPoolTest{
    public static void main(String[] args) throws InterruptedException{
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(0,500));
        pool.awaitTermination(2, TimeUnit.SECONDS);
        pool.shutdown();
    }

}

多线程三(线程组和线程池)

8核计算机的执行效果

下面程序示范了使用RecursiveTask对一个长度为100的数组的元素值进行累加。

package com.gdut.thread.threadPool;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

class CalTask extends RecursiveTask<Integer>{

    private static final int THRESHOLD = 20;
    private int[] arr;
    private int start;
    private int end;

    public CalTask(int[] arr,int start,int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if(end-start<THRESHOLD){
            for (int i = start; i <end ; i++) {
                sum +=arr[i];
                System.out.println(Thread.currentThread().getName()+"加上"+arr[i]+"后的累加值为:"+sum);
            }
            return sum;
        }else{
            int middle = (end+start)/2;
            //将大任务分成两个小任务
            CalTask left = new CalTask(arr,start,middle);
            CalTask right = new CalTask(arr,middle,end);
            //执行两个小任务
            left.fork();
            right.fork();
            //将两个小任务累加的结果合并起来
           return left.join() + right.join();
        }

    }
}

public class Sum {

    public static void main(String[] args) throws Exception{
        int[] arr = new int[100];
        Random rand = new Random();
        int total = 0;
        for (int i = 0; i <100 ; i++) {
            int tmp = rand.nextInt(20);
            total += (arr[i] = tmp);
        }
        System.out.println(total);

        ForkJoinPool pool = new ForkJoinPool();
        Future future = pool.submit(new CalTask(arr,0,100));
        System.out.println(future.get());
        pool.shutdown();
    }
}

多线程三(线程组和线程池)