如何手写一个线程池(附线程池的用法)
设计一个线程池
首先,我们要设计线程池的目的是为了管理线程,所以它的核心就是提供线程来处理我们提交的任务。在设计之前我们先来思考一下几个问题:
1、初始化的时候就创建线程还是提交任务的时候创建线程?
2、假设我们提交的任务数大于了最大线程数怎么处理?直接抛弃?还是存放到队列里?
3、这个队列有没有上限?如果存放到队列里队列达到了上限怎么办?
1)直接抛弃?
2)抛异常不让客户端提交?
3)阻塞,等队列有空位再存这个任务?
4)阻塞一段时间,时间过了队列还没有空位存就放弃提交任务?
1、我们当然希望有任务才去创建线程,因为如果这个线程池没有提交任务就创建了线程,那么这些线程就会空转或者睡眠,造成浪费。
2、当客户端提交的任务大于线程池的最大线程数maximumPoolSize时,我们也不应该把这个任务直接丢弃,因为线程池中的线程可能很快就把当前任务执行完。所以比较好的方式就是用个队列存储起来,当线程执行完了可以立马从线程池里面拿到任务执行。其实为了设计得更加合理一点应该设置一个核心线程数corePoolSize。
1)当工作线程数wc < 核心线程数corePoolSize时,直接创建新的工作线程
2)当工作线程数wc > = 核心线程数corePoolSize时,把任务存放到队列中。如果此时工作线程为0,我们应该开启空闲线程去处理我们提交的任务(核心线程数为0),不然我们的任务就只是存在队列中而没有被执行。所以我们还需要看看工作线程是否为0,如果为0则应该创建线程去处理这些任务。
3)当队列满了的时候 & 当工作线程数wc <= 最大线程数maximumPoolSize,创建新的工作线程线程(空闲线程)
4)当队列满了的时候 & 当工作线程数wc > 最大线程数maximumPoolSize,执行拒绝策略
3、一般来说我们的线程池的任务队列应该也要有个最大值,不能一直存,不然会有内存溢出的风险。但也不能排除有些场景需要用到*队列,所以我们的队列也作一个简单的抽象,分为有界和*队列。这里我们就只实现一个有界队列,有界队列实现了再看*队列就很简单了。当这个队列满了的话,到底要不要放弃这个任务或者以什么方式放弃这个任务都应该视应用场景来划分,所以不同的场景会有不同的策略,我们这里可以使用策略模式对不同的场景设计不同的策略。
通过分析以上的三个问题,我们可以得到一个线程池的大概工作流程图:
接下来分析一下我们线程池的一个组成部分:workers(工作线程集合)、队列、拒绝策略以及一个线程工厂,还有核心线程数和最大线程数。除此之外,我们应该还要设计一个线程睡眠等待的最大时间,如果超过了这个时间还没有任务就把线程回收掉。然后我们的这个线程池还要有个核心的api就是submit来提交我们的任务。现在我们来设计一下我们这个线程池的类图(图可能画得不太好看,将就着看吧):
设计代码
这里我也模仿一下jdk的源码设计两个接口:Executor和ThreadPool(ThreadPool继承Executor),Executor提供一个execute的api执行任务,ThreadPool则提供submit和processWorkerExit两个api用来提交任务和释放线程,这两个接口的代码如下:
public interface Executor {
/**
* 执行具体的task
*/
void excute(Runnable task);
}
public interface ThreadPool extends Executor {
/**
* 处理提交的任务
*
* @param task
*/
void submit(Runnable task);
/**
* 释放线程
*
* @param worker
*/
void processWorkerExit(Worker worker);
}
然后再设计一个AbstractThreadPool抽象类提供submit的实现:
public abstract class AbstractThreadPool implements ThreadPool {
/**
* 处理提交的任务
* 这里可以对task做一些扩展,例如增加个返回值
* 我这里就不扩展了,直接让返回类型是void
* @param task
*/
@Override
public void submit(Runnable task) {
if (task == null) throw new NullPointerException();
excute(task);
}
}
然后我们来看最终的实现类ThreadPoolExecutor:
public class ThreadPoolExecutor extends AbstractThreadPool {
//拒绝策略
private final RejectedHandler handler;
//记录工作线程的数量
private AtomicInteger workCount;
//定义一把锁
private final ReentrantLock mainLock = new ReentrantLock();
//任务队列
private final Queue queue;
//核心线程数
private final int corePoolSize;
//最大线程数
private final int maximumPoolSize;
//创建线程的工厂
private ThreadFactory factory;
//存放线程
private final HashSet<Worker> workers = new HashSet();
/**
* 构造方法,初始化线程池参数
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param queue 任务队列
* @param handler 拒绝策略
* @param factory 线程工厂--可以给线程起名
*/
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, Queue queue, RejectedHandler handler, ThreadFactory factory) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queue = queue;
this.workCount = new AtomicInteger();
this.handler = handler;
this.factory = factory;
}
/**
* 执行提交的任务
*
* @param task
*/
@Override
public void excute(Runnable task) {
if (task == null) throw new NullPointerException();
/**
* 当前工作线程数
* 1、wc < corePoolSize,新增一个核心线程worker
* 2、wc > corePoolSize,把task入队
* 1)入队成功->判断线程池是否还有工作线程,没有的话再创建一个工作线程
* 2)入队失败(队列已满)->创建空闲工作线程
* ①创建成功->执行task
* ②创建失败(空闲线程已满)->执行拒绝策略
*/
int wc = workCount.get();
if (wc < corePoolSize) {
if (addworker(task, true)) {
return;
}
}
if (queue.offer(task)) {
//假如创建的线程池的核心线程数为空
//那么一开始就没有工作线程,这种情况创建空闲线程
//给个null的任务给它则会从队列中取任务
wc = workCount.get();
if (wc == 0) {
addworker(null, false);
}
} else if (!addworker(task, false)) {//队列满了,我们尝试创建空闲线程
//如果创建线程失败,则执行拒绝策略
handler.rejected(task, this);
}
}
/**
* @param task 执行的任务
* @param core 核心线程数 or 最大线程数
* @return
*/
private boolean addworker(Runnable task, boolean core) {
//cas 把工作线程数先加上
for (; ; ) {
int wc = workCount.get();
int size = core ? corePoolSize : maximumPoolSize;
if (wc >= size)//工作线程已经达到了最大则返回false
return false;
if (workCount.compareAndSet(wc, wc + 1)) {
break;
}
}
//创建一个工作线程
Worker w = new Worker(task, queue, factory, this);
Thread t = w.thread;
boolean workerStarted = false;
boolean workerAdded = false;
if (t != null) {
//判断线程的状态
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
/**
* 先存储新new的worker
*/
mainLock.lock();
try {
workers.add(w);
workerAdded = true;
} finally {
mainLock.unlock();
}
}
try {
/**
* 存储成功后再启动线程
*/
if (workerAdded) {
t.start();//启动线程
workerStarted = true;
if (!core) {
System.out.println("队列中有" + queue.size() + "个任务,已满创建空闲线程");
}
}
} finally {
if (!workerStarted) {
//释放work
processWorkerExit(w);
}
}
return workerStarted;
}
/**
* 释放工作线程
*
* @param worker
*/
@Override
public void processWorkerExit(Worker worker) {
mainLock.lock();
try {
//释放掉工作线程
workers.remove(worker);
workCount.decrementAndGet();
} finally {
mainLock.unlock();
}
}
}
ThreadPoolExecutor主要提供execute和processWorkerExit的实现,execute处理提交的任务,processWorkerExit则用来释放工作线程。execute处理处理任务的时候优先给这个任务分配一个新的线程,如果此时的工作线程数已经大于核心线程数了,没法分配了则把这个任务存到任务队列里面,等待其他线程执行完了再来队列里面取这个任务执行。如果任务队列也满了,放不下了并且工作线程数还没达到最大线程数,则给这个任务分配一个新的线程,工作线程数已经达到了最大值则执行拒绝策略。拒绝策略我这里给了两个实现,一个是直接抛异常,另一个则是用提交任务的线程执行:
public interface RejectedHandler {
void rejected(Runnable r, ThreadPool pool);
}
public class AbortPolicy implements RejectedHandler {
@Override
public void rejected(Runnable r, ThreadPool pool)
//直接抛异常
throw new RejectedException("Task is rejuectd");
}
}
public class CallerRunsPolicy implements RejectedHandler {
@Override
public void rejected(Runnable r, ThreadPool pool)
//用提交任务的线程执行这个任务
if (r != null)
r.run();
}
}
接下来,我们还得设计一下我们的工作线程,工作线程主要是执行我们提交的task,如果执行完了再从队列里面去取:
public class Worker implements Runnable {
private Runnable task;
protected Thread thread;
private Queue queue;
private final ThreadPool pool;
/**
* 构造方法,初始化worker
*
* @param task
*/
Worker(Runnable task, Queue queue, ThreadFactory factory, ThreadPool pool) {
this.task = task;
this.thread = factory.createThread(this);
this.queue = queue;
this.pool = pool;
}
@Override
public void run() {
try {
while (task != null || (task = getTask()) != null) {
task.run();
//执行完了则把task置为空,让它执行getTask去队列里面去取
task = null;
}
} finally {
//如果已经没有任务可以执行了,则回收掉这个线程
pool.processWorkerExit(this);
}
}
/**
* 从队列中获取一个任务
*
* @return
*/
private Runnable getTask() {
return queue.poll();
}
}
然后我们再来看看队列的实现:
public interface Queue {
boolean offer(Runnable run);
Runnable poll();
int size();
}
public class BlockingQueue implements Queue {
private final ReentrantLock lock = new ReentrantLock();
//任务队列
private final LinkedList<Runnable> queue = new LinkedList<>();
private int size;
private long timeOut;
/**
* 空的条件队列
*/
private final Condition empty;
/**
* 满的条件队列
*/
private final Condition full;
/**
* 构造方法
*
* @param capacity
* @param keepAliveTime
* @param unit
*/
BlockingQueue(int capacity, long keepAliveTime,
TimeUnit unit) {
this.size = capacity;
empty = lock.newCondition();
full = lock.newCondition();
this.timeOut = unit.toNanos(keepAliveTime);//超时时间
}
/**
* 存一个任务
*
* @param run
* @return
*/
@Override
public boolean offer(Runnable run) {
if (run == null) {
throw new NullPointerException();
}
lock.lock();
try {
long n = timeOut;
//队列满了则睡眠等待
while (size == queue.size()) {
if (n <= 0) {
//超时则入队失败
return false;
}
//如果还没到时间则继续睡眠
n = full.awaitNanos(n);
}
queue.addLast(run);
//唤醒一个线程告诉它可以从队列里面取任务了
empty.signal();
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return false;
}
/**
* 取一个任务
*
* @return
*/
@Override
public Runnable poll() {
lock.lock();
try {
long n = timeOut;
while (queue.isEmpty()) {
if (n <= 0) {
//超时则返回null,返回null后工作线程就会被回收掉
return null;
}
System.out.println(Thread.currentThread().getName() + "队列为空,睡眠等待");
//如果还没到时间则继续睡眠
n = empty.awaitNanos(n);
}
//取出一个任务返回执行
Runnable task = queue.removeFirst();
//唤醒一个线程,告诉它可以往队列里面存了
full.signal();
return task;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
/**
* 队列的大小
*
* @return
*/
@Override
public int size() {
return queue.size();
}
}
我这里设计的队列是一个有界队列,如果队列满了让线程睡眠一段时间等队列有位置了再存,如果一直没等到则返回false表示入队失败。如果队列没有任务可以取了,则让这个线程也睡眠等待一段时间,如果没有等到则返回null,返回null工作线程拿不到任务就会被回收掉。
然后我们写个测试方法来测试一下:
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
//创建一个核心线程数为1,最大线程数为3,大小为1的队列的线程池
ThreadPool pool = new ThreadPoolExecutor(1, 3, new BlockingQueue(1, 1, TimeUnit.SECONDS), new CallerRunsPolicy(), run -> new Thread(run, "thread" + i.incrementAndGet()));
/**
* 提交5个任务,这个任务睡眠两秒后打印当前的线程和任务名
*/
for (int j = 0; j < 5; j++) {
String taskname = " run task" + j;
pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + taskname + " start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + taskname + " end");
});
}
}
执行结果如下:
当我们提交五个任务的时候,由于核心线程数只有一个,所以还没执行完则往队列里面去存放,而队列只能存放一个任务,所以放了一个后就满了,此时就会创建空闲线程thread2执行提交的任务。此时t1执行task0,task1存到队列里面,当task2来了则创建thread2来执行task2,然后t1已经执行完了则从队列里面去拿task1执行,此时task3来了队列没满又被放到队列里面。接着task4来了,发现队列满了而线程数还没达到最大则创建thread3执行这个任务。然后thread2也执行完了,发现队列里面还有个task3,于是执行task3。再接着thread1又执行完了,但此时队列已经没有任务了睡眠等待,然后thread3也执行完了也睡眠,最后thread2也执行完了也睡眠了,然后睡眠一段时间都被回收,整个线程池结束。这里思考个问题:为什么jdk的线程池不会结束?(文末解答)
当然啦,这只是这次的执行结果是这样而已,每次执行的结果都不一定,因为每次cpu的调度可能不一样。下面再来验证一下两个拒绝策略,首先用CallerRunsPolicy策略,如果空闲线程和队列都满了则提交的线程来执行,然后把最大线程数设为1:
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
//创建一个核心线程数为1,最大线程数为1,大小为1的队列的线程池
ThreadPool pool = new ThreadPoolExecutor(1, 1, new BlockingQueue(1, 1, TimeUnit.SECONDS), new CallerRunsPolicy(), run -> new Thread(run, "thread" + i.incrementAndGet()));
/**
* 提交5个任务,这个任务睡眠两秒后打印当前的线程和任务名
*/
for (int j = 0; j < 5; j++) {
String taskname = " run task" + j;
pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + taskname + " start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + taskname + " end");
});
}
}
结果:
可以看到当我们线程池的最大线程数是1的时候只会创建一个线程,这时队列也只能存一个任务,当task2来了肯定执行不了并且队列也满了,此时的拒绝策略则由我们的main方法来执行。然后我们试试AbortPolicy看看结果:
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
//创建一个核心线程数为1,最大线程数为2,大小为1的队列的线程池
ThreadPool pool = new ThreadPoolExecutor(1, 1, new BlockingQueue(1, 1, TimeUnit.SECONDS), new AbortPolicy(), run -> new Thread(run, "thread" + i.incrementAndGet()));
/**
* 提交5个任务,这个任务睡眠两秒后打印当前的线程和任务名
*/
for (int j = 0; j < 5; j++) {
String taskname = " run task" + j;
//由于拒绝策略会抛异常,所以捕获一下
try {
pool.submit(() -> {
System.out.println(Thread.currentThread().getName() + taskname + " start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + taskname + " end");
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
jdk线程池的分析
上面的代码就是线程池的一个工作原理,不知道有没发现,上面的线程池是没有状态的,我们是没办法修改线程池的状态。下面我们来分析一下jdk线程池的状态,首先我们来看看jdk线程池的一段代码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
在jdk线程池里面有个AtomicInteger对象ctl,它是通过一个方法ctlOf来初始化的,而ctlOf有两个参数,一个是状态,另一个是wc工作线程的数量。所以这里可以看到ctl是装载着两个信息:线程池的状态和工作线程的数量。为什么要这样设计?这是因为如果多线程来操作线程池的状态和工作线程的数量的时候要加锁,而设计成一个AtomicInteger对象则可以通过CAS的操作保证线程安全,避免了加锁的操作(所以我们写代码遇到多线程的情况时也可以参考这种写法)。
通过分析上面的代码可以知道ctl的前三位代表线程池的状态,后面的29代表的是工作线程的数量。前三位的解析如表格所示:
状态 | value | 说明 |
---|---|---|
RUNNING(当线程池创建出来的初始状态) | 111 | 能接受任务,能执行阻塞任务 |
SHUTDOWN(调用shutdown方法) | 000 | 不接受新任务,但已提交的任务会执行完(包括队列中的任务),不会阻塞调用线程的执行。(线程池shutdown之后可以调用awaitTermination让调用线程等待所有任务执行完,如果超时则不等。主要用于回收资源) |
STOP(调用shutDownNow) | 001 | 不接受新任务,打断正在执行的任务,丢弃阻塞任务。会将队列中的任务返回(List)并用 interrupt 的方式中断正在执行的任务 |
TIDYING(中间状态) | 010 | 任务全部执行完,活动线程也没了 |
TERMINATED(终结状态) | 011 | 线程池终结状态 |
jdk线程池还提供了一个工厂类,这里挑几个常用的工厂方法作解析(ForkJoinPool先忽略):
public class Executors {
/**
*
* 创建一个线程数固定的线程池(没有空闲线程),因此也无需超时时间
* 核心线程数 == 最大线程数
* 阻塞队列是*的,可以放任意数量的任务
* 适用于任务量已知,相对耗时的任务
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 和上面一样,不过多了个线程工厂,可以通过线程工厂给线程池里的线程起名
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* 创建只有一个工作线程的线程池并且这个线程数不能修改,队列也是*队列
* 适用于多个任务需要排队执行的情况
* 和自己创建线程的区别:
* 自己创建线程执行任务过程中出错终止则没有任何补救措施
* 线程池遇到出错终止的情况还会创建一个新的线程来保证线程池的正常工作
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 这个不多说,也是可以传一个线程工厂
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
/**
* 核心线程数为0,最大线程数是Integer.MAX_VALUE
* 全部都是空闲60s后回收,队列是同步队列,当任务被取走以后才能继续入队,使用同步队列的原因是为了防止一下子创建很多线程
* 有任务来了就看有无线程可用,有则复用,没有则创建新线程来执行任务
* 终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源
* 这种线程池比较灵活,对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* 这个也一样,也是可以传一个线程工厂
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
/**
* 创建一个延时线程池,只有一个线程
* 通过schedule提交一个延时任务
*
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* 也是创建一个延时线程池,但可以指定线程工厂
*
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
/**
* 创建一个延时线程池,可以指定线程池的核心线程数
* 也是通过schedule提交一个延时任务
*
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 创建一个延时线程池,可以指定线程池的核心线程数和线程工厂
* 也是通过schedule提交一个延时任务
*
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
/** 防止被实例化. */
private Executors() {}
}
接下来再弄个表格总结一下吧:
线程池 | 特点 | 场景 |
---|---|---|
newFixedThreadPool | 创建固定线程数的线程池,但这个线程数可以被修改 | 适用于任务量已知,相对耗时的任务 |
newSingleThreadExecutor | 线程池的线程数只能是1,不能被修改 | 适用于多个任务需要排队执行的情况 |
newCachedThreadPool | 核心线程数为0,可以创建非常多的空闲线程 | 对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能 |
newScheduledThreadPool | newScheduledThreadPool | 任务需要延时执行或者定时执行 |
jdk的线程池的submit提交任务是可以有返回值的:
public class TestPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个线程数为1的线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
//提交一个任务,并且返回success的字符串
Future<String> future = executorService.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 1");
return "success";
});
//打印线程执行完后的返回值
System.out.println(Thread.currentThread().getName() + " " +future.get());
}
}
执行结果:
提交多个任务:
public static void main(String[] args) throws InterruptedException {
//创建一个线程数为1的线程池
ExecutorService executorService = Executors.newFixedThreadPool(1);
//提交多个任务,返回1、2、3
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(() -> "1", () -> "2", () -> "3"));
//拿到返回结果打印
futures.forEach(f -> {
try {
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
}
执行结果:
提交多个任务,随机执行一个:
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建一个线程数为3的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
//提交多个任务,2只睡眠500毫秒,所以2先执行拿到的结果是2
String s = executorService.invokeAny(Arrays.asList(() -> {
TimeUnit.MILLISECONDS.sleep(5000);
return "1";
}, () -> {
TimeUnit.MILLISECONDS.sleep(500);
return "2";
}, () -> {
TimeUnit.MILLISECONDS.sleep(3000);
return "3";
}));
System.out.println(s);
}
执行结果:
因为有3个线程,所以谁先执行则返回谁,其他没执行的线程则被打断。如果只有一个线程,那么就只会执行第一个提交的任务返回。
下面演示一下如何用延时线程池提交定时任务,每隔3秒钟执行一次:
public static void main(String[] args) {
//创建一个延时的线程池
scheduledExecutorService = Executors.newScheduledThreadPool(1);
//提交一个延时任务,3秒钟后执行,执行的内容是executeTask
scheduledExecutorService.schedule(()->executeTask(),3,TimeUnit.SECONDS);
}
public static void executeTask(){
//打印当前执行的秒数
System.out.println(LocalDateTime.now().getSecond() + " execute task");
//打印完后再提交一个executeTask的延时任务,3秒后执行
scheduledExecutorService.schedule(()->executeTask(),3,TimeUnit.SECONDS);
}
执行结果:
除了这种方式以外还有两种方式可以实现定时任务,第一种是使用它的api:scheduleAtFixedRate,这种情况的第三个参数是period是包含线程的执行时间的:
public static void main(String[] args) {
//创建一个延时的线程池
scheduledExecutorService = Executors.newScheduledThreadPool(1);
//提交一个延时任务,第一个任务间隔2秒开始执行第二个任务,往后的任务都是间隔1秒
scheduledExecutorService.scheduleAtFixedRate(()->{
System.out.println(LocalDateTime.now().getSecond() + " execute task");
/**
* 由于这里睡眠了2秒,所以后面的任务都没有再等待1s就执行了
*/
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},2,1,TimeUnit.SECONDS);
}
执行结果如下:
从结果中可以看到,每隔2秒执行一次,说明scheduleAtFixedRate方法是从线程开始执行就计时了,所以睡眠了两秒以后那1秒的间隔时间也到了,因此不再等待就直接执行。
第二种是scheduleWithFixedDelay,第三个参数是delay是等线程执行完才算等待时间:
public static void main(String[] args) {
//创建一个延时的线程池
scheduledExecutorService = Executors.newScheduledThreadPool(1);
//提交一个延时任务,第一个任务间隔2秒开始执行第二个任务,往后的任务都是间隔1秒
scheduledExecutorService.scheduleWithFixedDelay(()->{
System.out.println(LocalDateTime.now().getSecond() + " execute task");
/**
* 即是睡眠了2秒,也还会等待1秒才执行
*/
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},2,1,TimeUnit.SECONDS);
}
执行结果如下:
解答思考题
最后我们来解答上面遗留下来的问题:为什么jdk的线程池任务执行完了以后不会结束?
首先我们先看看下面的这个例子:
public static void main(String[] args) {
//创建一个核心线程数为0的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
threadPoolExecutor.execute(()-> System.out.println("execute"));
}
执行结果如下:
当核心线程池为0的时候任务执行完了线程池就结束了,如果我们创建的核心线程数不为0的线程池时:
public static void main(String[] args) {
//核心线程数不为0
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
threadPoolExecutor.execute(()-> System.out.println("execute"));
}
执行结果如下:
通过上面的例子可以得到一个结论:jdk的线程池不会结束是因为有核心线程数,也就是只会回收空闲线程而不会回收核心线程。
那么jdk的线程池是怎么实现的呢?我们来看看jdk线程池从队列中取任务的代码:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* 注意这里的timed值,如果运行超时或者工作线程数大于核心线程数则为true
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 如果timed为true去调用poll方法,否则调用take方法取任务
* poll方法有超时时间,超过这个时间则回收线程,take方法则没有超时时间
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
通过注释我们可以看到allowCoreThreadTimeOut的值默认是false,所以默认情况下如果线程池的线程数小于核心线程数则timed的值就为false,所以就会用workQueue的take方法获取任务,而take方法不会超时。
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
本文地址:https://blog.csdn.net/JankeDeng/article/details/109379567