线程池
//线程池: 只是按初始线程数执行任务,并没有创建新的工作者线程(take给阻塞了,影响吞吐量)
public class ThreadPool {
//任务队列
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private int poolSize; //初始线程数
private int maxPoolSize; //最大线程数
private int size = 0; //线程数
//运行线程数:工作者
private List<Worker> works = new ArrayList<Worker>();
private boolean isshutdown = false; //线程池状态
private Timer timer;
public ThreadPool(int poolSize,int maxPoolSize) {
this.poolSize = poolSize;
this.maxPoolSize = maxPoolSize;
init();
}
//初始化一些工作者
private void init() {
Worker w = null;
for(int i=0;i<this.poolSize;i++) {
w = createThread();
works.add(w);
}
}
//创建工作者
private Worker createThread() {
Worker w = new Worker();
Thread t = new Thread(w);
t.setName("t" + size);
w.thread = t;
t.start();
size++;
return w;
}
private Worker createThread(Runnable task) {
Worker w = new Worker(task);
Thread t = new Thread(w);
t.setName("t" + size);
w.thread = t;
t.start();
size++;
return w;
}
//提交任务
public void execute(Runnable task) {
System.out.println("execute..");
try {
if(size <= poolSize) { //放入队列
queue.offer(task, 10, TimeUnit.SECONDS);
}else if(size<maxPoolSize) { //在新创建的线程中执行任务
Worker w = createThread();
works.add(w);
}else { //拒绝任务
reject(task);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch(Exception e) {
e.printStackTrace();
}
}
//拒绝任务
private void reject(Runnable task) throws Exception{
throw new Exception("无法提交");
}
//获取任务
public Runnable getTask() {
// if(!queue.isEmpty()) {
try {
return queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
// }
//没有任务,工作者线程自动终止
return null;
}
public void shutdown() {
this.isshutdown = true;
//终止工作者线程
//使用定时器检测线程池的任是否全部执行完,如果执行完就终止线程
timer = new Timer();
timer.schedule(new TimerTaskImpl(), 30*1000);
}
private class Worker implements Runnable{
private Runnable firstTask;
private Thread thread;
public Worker() {
}
public Worker(Runnable task) {
System.out.println("创建新的工作者线程..");
this.firstTask = task;
}
public void run() {
System.out.println("worker ..");
Runnable task = null;
if(firstTask != null) {
task = firstTask;
firstTask = null;
}
while(task!=null || (task=getTask())!=null) {
task.run();
System.out.println(thread.getName());
task = null;
}
}
//终止线程
public void interrupt() {
if(Thread.currentThread()!=thread) {
thread.interrupt();
}
}
}
//任务
static class Task implements Runnable{
static int i = 0;
public void run() {
System.out.println((i++) + " hello");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//定时任务:关闭工作者线程
class TimerTaskImpl extends TimerTask{
public void run() {
if(isshutdown && queue.isEmpty()) {
for(Worker w: works) {
w.interrupt();
}
timer.cancel();
}
System.out.println("定时器任务..");
}
}
public static void main(String[] args) {
// ThreadPool pool = new ThreadPool(1,2);
// for(int i=0;i<2;i++) {
// Task task = new Task();
// pool.execute(task);
// }
ThreadPool pool = new ThreadPool(3,10);
for(int i=0;i<20;i++) {
Task task = new Task();
pool.execute(task);
}
pool.shutdown();
}
}
上一篇: jdk自带的线程池