java ThreadPoolExecutor
java ThreadPoolExecutor 学习笔记
/** * @param corePoolSize * 线程池中保持的线程数量 * @param maximumPoolSize * 线程池最大可开启线程数 * @param keepAliveTime * 当线程池中的线程总量大于保持的线程数量时, * 此参数设置其中空闲的线程将保持的时间数量 * @param unit * 保持的时间单位 * 当其中的线程有空闲超过这个时间的,就将此线程杀死, * 一直到线程池的保持线程量,此时间设置将不再起作用 * @param workQueue * 线程池的等待队列 * 当需要执行的线程大于保持的线程数量时,会将此线程加入到这里设置的等待队列中 * @param threadFactory * 线程工厂类 制定了此参数后可将Runnable实现类添加到execute方法中 * 缺省使用静态内部类DefaultThreadFactory * @param handler * 当线程的等待队列超出容量时 执行的处理程序 * 缺省使用静态内部类AbortPolicy */ public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){}
一、简单案例
1、Executors.newFixedThreadPool(int num)
特点:
固定线程池中可用线程数量,等待队列使用的是*链表队列LinkedBlockingQueue,
源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
简单例子如下所示:
public static void jdkExecutor(){ ExecutorService exe = Executors.newFixedThreadPool(3); for(int i=0;i<5;i++){ exe.execute(new Thread(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } })); } } //使用的是 线程的缺省 DefaultThreadFactory(静态内部类)线程工厂 public static void jdkExecutor2(){ ExecutorService exe = Executors.newFixedThreadPool(3); for(int i=0;i<5;i++){ exe.execute(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } }); } } //自定义线程工厂 public static void jdkExecutor3(){ ExecutorService exe = Executors.newFixedThreadPool(3, new JdkFactory()); for(int i=0;i<5;i++){ exe.execute(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } }); } } //自定义工厂 public static class JdkFactory implements ThreadFactory{ @Override public Thread newThread(Runnable r) { return new Thread( r); } }
运行结果如下所示(有三个线程在执行):
pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-3 pool-1-thread-1
2、Executors.newSingleThreadExecutor()
特点:
每一个任务都是使用同一个线程完成,等待队列使用的是*链表队列LinkedBlockingQueue,
源码:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
简单例子如下所示:
public static void jdkExecutor(){ ExecutorService exe = Executors.newSingleThreadExecutor(); for(int i=0;i<5;i++){ exe.execute(new Thread(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } })); } } //线程工厂模式同上
运行结果如下所示:
pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1 pool-1-thread-1
3、ExecutorService newCachedThreadPool()
特点:
线程池中可用线程数量无限,等待队列使用的是同步队列SynchronousQueue,会有60秒时间等待是否让空闲线程关闭。
源码:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
简单案例如下所示:
public static void jdkExecutor(){ ExecutorService exe = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ exe.execute(new Thread(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } })); } } //线程工厂模式同上
运行结果如下所示:
pool-1-thread-1 pool-1-thread-3 pool-1-thread-2 pool-1-thread-4 pool-1-thread-6 pool-1-thread-2 pool-1-thread-4 pool-1-thread-3 pool-1-thread-6 pool-1-thread-5
4、直接使用JDK ThreadPoolExecutor
简单案例如下所示:
public static void jdkBaseExecuter(){ ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200), new ErrorHandler()); for(int i=0;i<10;i++){ treadepool.execute(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } }); } } //RejectedExecutionHandler缺省使用内部类AbortPolicy private static class ErrorHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r,ThreadPoolExecutor executor) { System.err.println("队列越界:"+executor.getQueue().size()); } }
运行结果如下所示:
pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-1 pool-1-thread-2
发现此时程序一直使用的是2个线程再执行任务,并未对保持线程数进行扩充。原因是因为此时有界缓存队列大小为200,未达到队列最大值,并不会重新开启新线程。将代码做如下改动:
ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(6),new ErrorHandler());
运行结果将如下所示:
pool-1-thread-1 pool-1-thread-3 pool-1-thread-3 pool-1-thread-3 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2 pool-1-thread-3 pool-1-thread-4 pool-1-thread-1
如果将有界队列值再缩小如下所示:
ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4),new ErrorHandler());
运行结果如下所示:
pool-1-thread-1 pool-1-thread-3 pool-1-thread-4 队列越界:4 pool-1-thread-2 pool-1-thread-4 pool-1-thread-4 pool-1-thread-3 pool-1-thread-1 pool-1-thread-2
就会出现队列越界错误,如果使用的是缺省ExecutionHandler将会初夏如下错误:
java.util.concurrent.RejectedExecutionException: Task xxxxxxx rejected from java.util.concurrent.ThreadPoolExecutor@56833a2e [Running, pool size = 4, active threads = 0, queued tasks = 0, completed tasks = 8] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
所以对线程池的配置要慎重。
5、ThreadPoolTaskExecutor spring线程池
简单案例如下所示:
public static void springExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //手动创建时需要进行初始化ThreadFactory 和RejectedExecutionHandler taskExecutor.initialize(); taskExecutor.setCorePoolSize(2); taskExecutor.setMaxPoolSize(4); taskExecutor.setQueueCapacity(200); for(int i=0;i<10;i++){ taskExecutor.execute(new Thread(new Runnable() { @Override public void run() { System.err.println(Thread.currentThread().getName()); } })); } }
当然ThreadPoolTaskExecutor 也可以在Spring的XML中进行定义,然后依赖注入进来,代码如下所示:
@Autowired private TaskExecutor taskExecutor; <bean id="taskExecutor" class= "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="4" /> <property name="queueCapacity" value="200" /> </bean> 注:在基于Spring容器时不需要手工进行initialize初始化,交由Spring容器初始化
运行结果如下所示:
ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-2 ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-2 ThreadPoolTaskExecutor-1 ThreadPoolTaskExecutor-2 ThreadPoolTaskExecutor-1
具体的参数配置同上
注:以上的各种队列的不同点将于下篇进行说明
上一篇: JVM 调优参数总结
下一篇: Java heap space 问题查找