欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java ThreadPoolExecutor

程序员文章站 2022-07-13 16:45:17
...

java ThreadPoolExecutor 学习笔记

/**
  * @param corePoolSize
  *        线程池中保持的线程数量
  * @param maximumPoolSize
  *        线程池最大可开启线程数
  * @param keepAliveTime
  *        当线程池中的线程总量大于保持的线程数量时,
  *        此参数设置其中空闲的线程将保持的时间数量
  * @param unit
  *        保持的时间单位
  *        当其中的线程有空闲超过这个时间的,就将此线程杀死,
  *        一直到线程池的保持线程量,此时间设置将不再起作用
  * @param workQueue
  *        线程池的等待队列
  *        当需要执行的线程大于保持的线程数量时,会将此线程加入到这里设置的等待队列中
  * @param threadFactory
  *        线程工厂类 制定了此参数后可将Runnable实现类添加到execute方法中
  *        缺省使用静态内部类DefaultThreadFactory
  * @param handler
  *        当线程的等待队列超出容量时  执行的处理程序
  *        缺省使用静态内部类AbortPolicy
  */
  public ThreadPoolExecutor(
      int corePoolSize,
      int maximumPoolSize,
      long keepAliveTime,
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue,
      ThreadFactory threadFactory,
      RejectedExecutionHandler handler){}

 一、简单案例

1、Executors.newFixedThreadPool(int num)

 特点:

固定线程池中可用线程数量,等待队列使用的是*链表队列LinkedBlockingQueue,

源码:

 public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

简单例子如下所示:

public static void jdkExecutor(){
	ExecutorService exe = Executors.newFixedThreadPool(3);
	for(int i=0;i<5;i++){
		exe.execute(new Thread(new Runnable() {
			@Override
			public void run() {
			System.err.println(Thread.currentThread().getName());
			}
		}));
	}
}
//使用的是  线程的缺省 DefaultThreadFactory(静态内部类)线程工厂
public static void jdkExecutor2(){
	ExecutorService exe = Executors.newFixedThreadPool(3);
	for(int i=0;i<5;i++){
		exe.execute(new Runnable() {
			@Override
			public void run() {
			System.err.println(Thread.currentThread().getName());
			}
		});
	}
}
//自定义线程工厂
public static void jdkExecutor3(){
	ExecutorService exe = Executors.newFixedThreadPool(3,
        new JdkFactory());
	for(int i=0;i<5;i++){
		exe.execute(new Runnable() {
			@Override
			public void run() {
			System.err.println(Thread.currentThread().getName());
			}
		});
	}
}
//自定义工厂
public static class JdkFactory implements ThreadFactory{
	@Override
	public Thread newThread(Runnable r) {
		return new Thread( r);
	}
}

 运行结果如下所示(有三个线程在执行):

pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1

 2、Executors.newSingleThreadExecutor()

特点:

每一个任务都是使用同一个线程完成,等待队列使用的是*链表队列LinkedBlockingQueue,

源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

 简单例子如下所示:

 

public static void jdkExecutor(){
	ExecutorService exe = Executors.newSingleThreadExecutor();
	for(int i=0;i<5;i++){
		exe.execute(new Thread(new Runnable() {
			@Override
			public void run() {
		        System.err.println(Thread.currentThread().getName());
			}
	        }));
	}
}
//线程工厂模式同上

运行结果如下所示:

 

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

 

3、ExecutorService newCachedThreadPool()

特点:

线程池中可用线程数量无限,等待队列使用的是同步队列SynchronousQueue,会有60秒时间等待是否让空闲线程关闭。

源码:

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 简单案例如下所示:

public static void jdkExecutor(){
	ExecutorService exe = Executors.newCachedThreadPool();
	for(int i=0;i<10;i++){
		exe.execute(new Thread(new Runnable() {
			@Override
			public void run() {
			System.err.println(Thread.currentThread().getName());
			}
		}));
	}
}
//线程工厂模式同上

 运行结果如下所示:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-6
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-6
pool-1-thread-5

 4、直接使用JDK ThreadPoolExecutor

 简单案例如下所示:

public static void jdkBaseExecuter(){
	ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60,
        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200),
        new ErrorHandler());
	for(int i=0;i<10;i++){
		treadepool.execute(new Runnable() {
			@Override
			public void run() {
			   System.err.println(Thread.currentThread().getName());
			}
		});
	}
}
//RejectedExecutionHandler缺省使用内部类AbortPolicy
private static class ErrorHandler implements RejectedExecutionHandler{
	@Override
	public void rejectedExecution(Runnable r,ThreadPoolExecutor executor) {
		System.err.println("队列越界:"+executor.getQueue().size());
	}
}

 运行结果如下所示:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2

 发现此时程序一直使用的是2个线程再执行任务,并未对保持线程数进行扩充。原因是因为此时有界缓存队列大小为200,未达到队列最大值,并不会重新开启新线程。将代码做如下改动:

ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60,
TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(6),new ErrorHandler());

 运行结果将如下所示:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1

 如果将有界队列值再缩小如下所示:

ThreadPoolExecutor treadepool = new ThreadPoolExecutor(2, 4,60,
TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4),new ErrorHandler());

 运行结果如下所示:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-4
队列越界:4
pool-1-thread-2
pool-1-thread-4
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-2

 就会出现队列越界错误,如果使用的是缺省ExecutionHandler将会初夏如下错误:

java.util.concurrent.RejectedExecutionException: 
Task xxxxxxx rejected from java.util.concurrent.ThreadPoolExecutor@56833a2e
[Running, pool size = 4, active threads = 0, queued tasks = 0, completed tasks = 8]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
 

 所以对线程池的配置要慎重。

5、ThreadPoolTaskExecutor spring线程池

简单案例如下所示:

public static void springExecutor(){
	ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //手动创建时需要进行初始化ThreadFactory 和RejectedExecutionHandler
	taskExecutor.initialize();
	taskExecutor.setCorePoolSize(2);
	taskExecutor.setMaxPoolSize(4);
	taskExecutor.setQueueCapacity(200);
	for(int i=0;i<10;i++){
		taskExecutor.execute(new Thread(new Runnable() {		
			@Override
			public void run() {
				System.err.println(Thread.currentThread().getName());
			}
		}));
	}
}

 当然ThreadPoolTaskExecutor 也可以在Spring的XML中进行定义,然后依赖注入进来,代码如下所示:

@Autowired
private TaskExecutor taskExecutor;
<bean id="taskExecutor" class=
"org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="2" />
		<property name="maxPoolSize" value="4" />
		<property name="queueCapacity" value="200" />
</bean>
注:在基于Spring容器时不需要手工进行initialize初始化,交由Spring容器初始化

 运行结果如下所示:

ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-2
ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-2
ThreadPoolTaskExecutor-1
ThreadPoolTaskExecutor-2
ThreadPoolTaskExecutor-1

 具体的参数配置同上

 注:以上的各种队列的不同点将于下篇进行说明