欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

《Java并发编程实践》笔记4——线程池高级

程序员文章站 2022-07-26 17:15:13
作者:chjttony 1.估算线程池最优大小: Ncpu = CPU的数量 = Runtime.getRuntime().availableProcessors(); Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1; W/C = 等待时间与计算时间的比率; 为了保持处理器达到期望 ......

作者:chjttony

1.估算线程池最优大小:

Ncpu = CPU的数量 = Runtime.getRuntime().availableProcessors();

Ucpu = 目标CPU的使用率, 0 <= Ucpu <= 1;

W/C = 等待时间与计算时间的比率;

为了保持处理器达到期望的使用率,最优的线程池大小等于:

Nthreads = Ncpu * Ucpu * (1+ W/C);

 

2.配置ThreadPoolExecutor:

ThreadPoolExecutor是Executors中工厂方法newCachedThreadPool、newFixedThreadPool和newScheduledThreadExecutor返回的ExecutorService接口的基本实现,提供了默认的执行策略。

ThreadPoolExecutor还允许通过构造方法定制执行策略,常用构造方法如下:

public ThreadPoolExecutor(int corePoolSize,   
                            int maximumPoolSize,   
                            long keepAliveTime,  
                            TimeUnit unit,   
                            BlockingQueue<Runnable> workQueue,  
                            ThreadFactory threadFactory,      
                            RejectedExecutionHandler handler){......}  

ThreadPoolExecutor配置执行策略的参数如下:

(1).核心池大小:

核心池大小是目标线程池大小,线程池实现试图维护池的大小,即使没有任务执行,池的大小也等于核心池的大小,并且直到工作队列充满前池都不会创建更多的线程。

如果核心池大小设置为0,则在工作队列填满之前线程池不会开始工作;若有一个有限的线程池和一个有限的工作队列,同时又希望所有的线程在没有任务的情况下销毁,则可以将核心线程池大小设置为0来激活这个特性,newCachedThreadPool的核心池大小就是0。

 

(2).最大池大小:

最大池大小是可同时活动的线程数的上限,若空闲时,会销毁超出核心池大小的多余线程。

newCachedThreadPool将最大池设置为Integer.MAX_VALUE。

 

(3).存活时间:

即空闲线程超时时间,若一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,若当前的池大小已经超过了核心池大小,这个线程会被终止掉。

newFixedThreadPool为请求的线程池设置了核心池和最大池的大小,而且存活时间永远不会超时。

newCachedThreadPool默认的超时时间为1分钟。

 

(4).任务阻塞队列:

当请求到达速度超了线程池处理速度时,线程池将尚未开始执行的任务放入任务阻塞队列中等待。

任务阻塞队列通常有以下3中:

A.无限队列:

newFixedThreadPool和newSingleThreadExecutor默认使用一个无限的LinkedBlockingQueue阻塞队列,若所有工作者线程都处于忙碌,任务将会在队列中等候,若任务持续快速地到达,队列也会无限制地增加。

 

B.有限队列:

为了避免资源被耗尽,线程池也经常使用有限队列,如ArrayBlockingQueue或有界的LinkedBlockingQueue;当队列满时,新来的任务会使用饱和策略处理。

 

C.同步移交:

同步移交完全绕开队列,直接将任务从生产者移交给工作者线程。

同步移交适合线程池无限或者可以接受任务被拒绝的情况,newCachedThreadPool就使用同步移交方式。

注意:只有任务彼此独立时,使用有限线程池或有限工作队列才是合理的,若任务之间相互依赖,有限的线程池或工作队列就可能引起线程的饥饿死锁,而使用无限的线程池配置(newCachedThreadPool)可以避免任务相互依赖引起的线程饥饿死锁。

 

(5).饱和策略:

饱和策略用于规定有界队列充满或线程池被关闭时对新提交任务的处理方案。

JDK提供了如下4中饱和策略:

A.中止(abort):

默认的饱和策略,会抛出未检查的拒绝执行异常RejectedExecutionException,调用者捕获该异常做适当处理。

 

B.遗弃(discard):

丢弃最新提交的任务。

 

C.遗弃最旧的(discard-oldest):

丢弃本应该接下来就要执行的任务,并尝试去重新提交新任务,若是优先级队列,则丢弃优先级最高的任务,因此不能混合使用遗弃最旧的饱和策略和优先级队列。

 

D.调用者运行(caller-runs):

既不会丢最新提交的任务,也不会抛出异常,会把最新提交的任务推回到调用者,由生产者线程执行,一方面给工作者线程时间来追赶进度,另一方面减慢了生产者提交新任务的速度。

通过调节核心池大小和存活时间,可以促进线程池归还空闲线程占用的资源,饱和策略用于应对过载处理。

 

3.定制线程工厂:

线程池通过线程工厂来创建线程,默认的线程池工厂会创建一个新的,非后台的线程,没有特殊配置。

线程池工厂可以运行定制线程的配置信息,例如:为线程池指定UncaughtExceptionHandler,用于捕获线程抛出的未检查异常以防止线程泄露;实例化一个定制的线程类实例,用来执行调试日志线程;给线程指定名称等等。

线程池工厂只有一个newThread方法,在线程池需要创建一个新线程时使用,例子代码如下:

public class MyThreadFactory implements ThreadFactory{  
    private final String poolName;  
      
    public MyThreadFactory(String poolName){  
        this.poolName = poolName;  
    }  
      
    public Thread newThread(Runnable runnable){  
        return new MyAppThread(runnable, poolName);  
    }  
}  
  
public class MyAppThread extends Thread{  
    public static final String DEFAULT_NAME = “MyAppThread”;  
    private static volatile boolean debug = false;  
    private static final AtomicInteger created = new AtomicInteger();  
    private static final AtomicInteger alive = new AtomicInteger();  
    private static final Logger log = Logger.getLogger(MyAppThread.class.getClassName());  
      
    public MyAppThread(Runnable r){  
        this(r, DEFAULT_NAME);  
    }  
  
    public MyAppThread(Runnable r, String name){  
        super(runnable, name + “-” + created.incrementAndGet());  
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){  
            public void uncaughtException(Thread t, Throwable e){  
                log.log(Level.SEVER, “Uncaught in thread ” + t.getName(), e);  
            }  
        });  
    }  
      
    public void run(){  
        if(debug){  
            log.log(Level.FINE, “Created ” + t.getName());  
        }  
        try{  
            alive.incrementAndGet();  
            super.run();  
        }finally{  
            alive.decrementAndGet();  
            if(debug){  
                log.log(Level.FINE, “Exiting ” + t.getName());  
            }  
        }  
    }  
  
    public static int getThreadsCreated(){  
        return created.get();  
    }  
  
    public static int getThreadsAlive(){  
        return alive.get();  
    }  
  
    public static boolean isDebug(){  
            return debug;  
    }  
  
    public static void setDebug(boolean debug){  
        this.debug = debug;  
    }  
}  

4.扩展ThreadPoolExecutor:

ThreadPoolExecutor提供了以下3个生命周期的钩子方法让子类扩展:

(1).beforeExecute:

任务执行前,线程会调用该方法,可以用来添加日志、监控或者信息收集统计。

若beforeExcute方法抛出了RuntimeException,线程的任务将不被执行,afterExecute方法也不会被调用。

 

(2).afterExecute:

任务执行结束后,线程会调用该方法,可以用来添加日志、监控或者信息收集统计。

无论任务正常返回或者抛出异常(抛出Error不能被调用),该方法都会被调用。

 

(3).terminate:

线程池完成关闭动作后调用,可以用来释放资源、发出通知、记录日志或者完成统计信息等。

一个扩展ThreadPoolExecutor的例子代码如下:

public class TimingThreadPool extends ThreadPoolExecutor{  
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();  
    private final Logger log = Logger.getLogger(TimingThreadPool.class.getClassName());  
    private final AtomicLong numTasks = new AtomicLong();  
    private final AtomicLong totalTime = new AtomicLong();  
  
    protected void beforeExecute(Thread t, Runnable r){  
        super.beforeExecute(t, r);  
        log.fine(String.format(“Thread %s: start %s”, t, r));  
        startTime.set(System.nanoTime());  
    }  
  
    protected void afterExecute(Runnable r, Throwable t){  
        try{  
            long endTime = System.nanoTime();  
            long taskTime = endTime - startTime.get();  
            numTasks.incrementAndGet();  
            totalTime.addAndGet(taskTime);  
            log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));  
        }finally{  
            super.afterExecute(r, t);  
        }  
    }  
  
    protected void terminated(){  
        try{  
            log.info(String.format("Terminated: avg time=%dns",   
                    totalTime.get() / numTasks.get()));  
        }finally{  
            super.terminated();  
        }  
    }  
}  

5.GUI单线程化:

几乎所有的GUI工具集都实现为单线程化子系统,即所有GUI的活动都被限制在一个单独的线程中。早期的GUI应用程序GUI事件在主事件循环中进行处理;现代的GUI框架使用了一个专门的事件派发线程(EDT)来处理GUI事件。

GUI被设计为单线程化的原因为:

(1).用户与应用程序的事件存在锁顺序死锁问题:

GUI程序中,事件是以冒泡方式进行传递的,以用户点击修改界面组件背景颜色为例:

A.用户发起的动作以冒泡方式从操作系统传递给应用程序:

首先,操作系统检测到一次鼠标点击;

其次,操作系统工具集把它转化为鼠标点击事件;

最后,操作系统将点击事件转发给应用程序GUI对象的监听器;


《Java并发编程实践》笔记4——线程池高级

 

B.应用程序发起的动作会以冒泡方式传回操作系统:

首先,应用程序GUI监听器发起改变组件背景颜色动作;

其次,GUI工具集把事件动作转发给特定组件类;

最后,组件类把发事件转发给操作系统进行渲染;

《Java并发编程实践》笔记4——线程池高级

 

上述两种动作以完全相反的顺访问相同的GUI对象,为了保证每个对象的线程安全,若每一层都使用一个锁加锁的话,这一系列的锁顺序不一致会导致锁顺序死锁问题,而如果每一层都共用一个锁的话,那就跟单线程没区别了。

(2).MVC模式的死锁问题:

目前MVC(模型-视图-控制器)模式在GUI开发中普遍流行,它把用户的交互分拨到模型、视图和控制器之间的协作中,极大地简化了GUI应用程序的实现,但是也再次面临不一致的锁顺序死锁问题:

控制器调用模型,模型通知视图已经发生了一些事情:

《Java并发编程实践》笔记4——线程池高级

 

控制器同样可以调用视图,视图调用模型来查询模型状态:

《Java并发编程实践》笔记4——线程池高级

 

锁顺序死锁问题让多线程的GUI程序极不稳定,bug众多且难以调试,因此GUI工具集都是单线程化。