线程池的四种阻塞队列与四种任务拒绝策略
四种拒绝策略:
/**
* 配置线程池的四种拒绝策略
* */
public class RejectPolicy {
/**
* 丢弃任务并抛出RejectedExecutionException异常
* */
public static RejectedExecutionHandler abortPolicy =
new ThreadPoolExecutor.AbortPolicy();
/**
* 丢弃任务但是不抛出异常
* */
public static RejectedExecutionHandler discardPolicy =
new ThreadPoolExecutor.DiscardPolicy();
/**
* 丢弃队列最前面的任务,最新任务入列
* */
public static RejectedExecutionHandler discardOldestPolicy =
new ThreadPoolExecutor.DiscardOldestPolicy();
/**
* 由调用线程处理该任务
* */
public static RejectedExecutionHandler callerRunsPolicy =
new ThreadPoolExecutor.CallerRunsPolicy();
}
1、RejectPolicy.discardPolicy
public class ThreadPool {
private static final Integer corePoolSize = 3;
private static final Integer maxMumPoolSize = 5;
private static final Integer keepAliveTime = 60000;
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.arrayBlockingQueue,
RejectPolicy.discardPolicy);
}
public static void main(String[] args){
for (int i = 1; i <= 10; i++) {
ThreadPool.threadPool.submit(new Task(String.valueOf(i)));
}
}
共生成10个线程,线程池最多同时处理5个线程,外加阻塞队列里面的2个线程,其余3个到达线程池根据任务丢弃策略直接被丢弃。运行结果如下:
任务1正在运行
任务4正在运行
任务5正在运行
任务7正在运行
任务2正在运行
任务3正在运行
任务6正在运行
2、RejectPolicy.abortPolicy
public class ThreadPool {
private static final Integer corePoolSize = 3;
private static final Integer maxMumPoolSize = 5;
private static final Integer keepAliveTime = 60000;
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.arrayBlockingQueue,
RejectPolicy.abortPolicy);
}
共生成10个线程,线程池最多同时处理5个线程,外加阻塞队列里面的2个线程,其余3个到达线程池根据任务丢弃策略被丢弃并抛出异常。运行结果如下:
任务1正在运行
任务4正在运行
任务5正在运行
任务2正在运行
任务3正在运行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task aaa@qq.com rejected from aaa@qq.com[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 4]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.frame.threadpool.taskblockingqueue.Test.main(Test.java:7)
任务7正在运行
任务6正在运行
3、RejectPolicy.discardOldestPolicy
public class ThreadPool {
private static final Integer corePoolSize = 1;
private static final Integer maxMumPoolSize = 3;
private static final Integer keepAliveTime = 60000;
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.arrayBlockingQueue,
RejectPolicy.discardOldestPolicy);
}
处理过程如下图:
原图见:https://www.processon.com/diagraming/5c6f66b1e4b07fada4ef4416
4、RejectPolicy.callerRunsPolicy
private static final Integer corePoolSize = 1;
private static final Integer maxMumPoolSize = 3;
private static final Integer keepAliveTime = 60000;
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.arrayBlockingQueue,
RejectPolicy.callerRunsPolicy);
callerRunsPolicy会将所有被拒绝的任务交给主线程处理,如果主线程已经终止,则任务被丢弃。我们看callerRunsPolicy的源码很明显:
四种阻塞队列:
/**
* 配置线程池的四种阻塞队列
* */
public class Queue {
private static final Integer idlePoolSize = 2;
/**
* 基于数组的有界阻塞队列
* */
public static ArrayBlockingQueue<Runnable> arrayBlockingQueue =
new ArrayBlockingQueue<Runnable>(idlePoolSize);
/**
* *有序的阻塞队列
* */
public static PriorityBlockingQueue<Runnable> priorityBlockingQueue =
new PriorityBlockingQueue<Runnable>(idlePoolSize);
/**
* 有界阻塞队列,线程安全
* */
public static LinkedBlockingQueue<Runnable> linkedBlockingQueue =
new LinkedBlockingQueue<Runnable>(idlePoolSize);
/**
* 无缓冲等待队列,是一个不存储元素的等待队列,会直接将任务交给消费者,必须等队列中的添加元素全部被消费之后才能继续添加元素
* */
public static SynchronousQueue<Runnable> synchronous =
new SynchronousQueue<Runnable>();
}
1、ArrayBlockingQueue
线程池的缓存队列是一个有界队列,实质是一个定长数组,不允许动态改变数组长度。
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and throwing an
* {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
每次添加的时候先判断count == items.length即当前队列中已经存在的线程数是否已经达到了我们定义的缓存队列的个数,如果已经达到,直接抛出IllegalStateException("Queue full");否则,添加到队列尾部,添加的时候加lock保证线程安全。
2、PriorityBlockingQueue
优先级队列,基于可变长数组的*队列,底层用最小堆实现,该队列定义的时候要么传入比较器,要么其任务对象类需要实现comparable接口,否则会报诸如一下的错误:
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
所有对该队列的定义最终都会落到上面的实例化方法,由此可见该队列在定义的时候需要传入一个长度和一个比较器,我们猜测在运行时会根据比较器顺序调用队列中的线程。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
添加队列时,如果实际存在的队列个数>=我们自定义的队列个数,则执行tryGrow(array, cap);尝试去增加数组长度,
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
判断当前数组长度是否大于64,若小于64,则增加为原来长度的2倍再加2,否则,则增加为原来长度的1.5倍。
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
执行数组copy操作。
扩展完数组长度,判断是否定义了比较器,如果没有定义比较器,强转成
Comparable<? super T> key = (Comparable<? super T>) x;
再入队,所以我们上面提到如果实例化的时候没有定义比较器,需要任务类实现Comparable接口才行,否则会报强转错误。如果定义了比较器,直接根据比较器比较结果入队即可。
关于如何定义比较器及实现类如何实现比较器接口不再赘述,请参看https://blog.csdn.net/qq_35689573/article/details/80568983中的TreeSet小节。
那我们再来看一下,每次是如何取出任务执行的呢?
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
每次取出数组中的第一个元素并根据比较器重新排序。
用这个阻塞队列,线程池中的线程数理论上来说是永远不会达到maxPoolSize的,我们实践一下:
/**
* 任务类
* @author Hexiangjun
* @version 2019-02-25 10:17
* */
public class Task implements Runnable{
private String taskName;
public Task(String taskName){
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("任务" + this.taskName + "正在运行");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务" + this.taskName + "已经完成。当前活跃线程数:" + ThreadPool.threadPool.getActiveCount()
+ ",已完成任务数:" + ThreadPool.threadPool.getCompletedTaskCount()
+ ",阻塞队列中的任务数:" + ThreadPool.threadPool.getQueue().size());
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
}
/**
* 自定义比较器
* @author Hexiangjun
* @version 2019-02-15 10:18
* */
public class Comp implements Comparator<Task>{
@Override
public int compare(Task o1, Task o2) {
return Integer.parseInt(o2.getTaskName())-Integer.parseInt(o1.getTaskName());
}
}
/**
* 可变长数组的有序队列
* */
static Comparator<Task> c = new Comp();
public static PriorityBlockingQueue priorityBlockingQueue =
new PriorityBlockingQueue(idlePoolSize,c);
public static void main(String[] args){
for (int i = 1; i <= 10; i++) {
ThreadPool.threadPool.execute(new Task(String.valueOf(i)));
System.out.println("当前活跃线程数:" + ThreadPool.threadPool.getActiveCount()
+ ",已完成任务数:" + ThreadPool.threadPool.getCompletedTaskCount()
+ ",阻塞队列中的任务数:" + ThreadPool.threadPool.getQueue().size());
}
}
运行结果:
任务1正在运行
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:0
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:1
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:2
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:3
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:4
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:5
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:6
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:7
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:8
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:9
任务1已经完成。当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:9
任务10正在运行
任务10已经完成。当前活跃线程数:1,已完成任务数:1,阻塞队列中的任务数:8
任务9正在运行
任务9已经完成。当前活跃线程数:1,已完成任务数:2,阻塞队列中的任务数:7
任务8正在运行
任务8已经完成。当前活跃线程数:1,已完成任务数:3,阻塞队列中的任务数:6
任务7正在运行
任务7已经完成。当前活跃线程数:1,已完成任务数:4,阻塞队列中的任务数:5
任务6正在运行
任务6已经完成。当前活跃线程数:1,已完成任务数:5,阻塞队列中的任务数:4
任务5正在运行
任务5已经完成。当前活跃线程数:1,已完成任务数:6,阻塞队列中的任务数:3
任务4正在运行
任务4已经完成。当前活跃线程数:1,已完成任务数:7,阻塞队列中的任务数:2
任务3正在运行
任务3已经完成。当前活跃线程数:1,已完成任务数:8,阻塞队列中的任务数:1
任务2正在运行
任务2已经完成。当前活跃线程数:1,已完成任务数:9,阻塞队列中的任务数:0
我们在比较器里面定义的是倒序执行,因此我们可以看到,入列的线程都是根据线程名称排序进入线程池的。
我们也可以给Task实现Comparable接口:
public class Task implements Runnable, Comparable<Task>{
@Override
public int compareTo(Task o) {
return Integer.valueOf(this.taskName) - Integer.valueOf(o.getTaskName());
}
}
/**
* 可变长数组的有序队列
* */
public static PriorityBlockingQueue priorityBlockingQueue =
new PriorityBlockingQueue(idlePoolSize);
运行结果:
任务1正在运行
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:0
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:1
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:2
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:3
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:4
任务1已经完成。当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:4
任务2正在运行
任务2已经完成。当前活跃线程数:1,已完成任务数:1,阻塞队列中的任务数:3
任务3正在运行
任务3已经完成。当前活跃线程数:1,已完成任务数:2,阻塞队列中的任务数:2
任务4正在运行
任务4已经完成。当前活跃线程数:1,已完成任务数:3,阻塞队列中的任务数:1
任务5正在运行
任务5已经完成。当前活跃线程数:1,已完成任务数:4,阻塞队列中的任务数:0
3、LinkedBlockingQueue
是基于链表结构的有界阻塞队列,我们先来看一下他的构造方法:
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
初始化时头结点和尾节点为同一个节点,并赋值为new Node<E>(null),下面我们通过研究put(E e)方法来看下这个队列是如何向里面放入元素的,方法用了ReentrantLock,是线程安全的:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
先把元素包装成链表中的一个节点,判断如果当前元素个数=队列最大容量,调用ReentrantLock的newCondition的await方法,放弃当前线程的锁,等待阻塞队列的任务被取出再唤醒当前线程。
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
将元素加入队列中,
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
同时count加1,判断如果当前元素个数加1后依然小于队列容量,唤醒等待线程。我们再来看一下该队列是如何取出元素的,
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
由此可见,每次取出队列中的第二个元素,并将第二个元素变成头元素。调用以上取出元素的方法有三个对外可见的元素,分别是take()、poll(long timeout, TimeUnit unit)、poll()。
take方法会无限阻塞等待直到队列中有元素取出,
while (count.get() == 0) {
notEmpty.await();
}
poll(long timeout, TimeUnit unit)会阻塞等待timeout时间,
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
poll()不会等待,会判断队列中是否有元素,没有元素则返回null
if (count.get() == 0)
return null;
还有另外一个取出方法,peek(),
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
如果队列长度为0,则返回null,否则,返回第二个元素,并且不对链表进行任何改变。至于线程池里面用的哪种方法,这里就不深究了哈,有兴趣的读者自己去研究吧。
下面我们来实践一下:
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.linkedBlockingQueue,
RejectPolicy.discardPolicy);
public static void main(String[] args){
for (int i = 1; i <= 5; i++) {
ThreadPool.threadPool.execute(new Task(String.valueOf(i)));
System.out.println("当前活跃线程数:" + ThreadPool.threadPool.getActiveCount()
+ ",已完成任务数:" + ThreadPool.threadPool.getCompletedTaskCount()
+ ",阻塞队列中的任务数:" + ThreadPool.threadPool.getQueue().size());
}
}
运行结果:
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:0
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:1
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:2
当前活跃线程数:2,已完成任务数:0,阻塞队列中的任务数:2
当前活跃线程数:3,已完成任务数:0,阻塞队列中的任务数:2
任务1正在运行
任务4正在运行
任务5正在运行
任务5已经完成。当前活跃线程数:3,已完成任务数:0,阻塞队列中的任务数:2
任务1已经完成。当前活跃线程数:3,已完成任务数:0,阻塞队列中的任务数:2
任务2正在运行
任务3正在运行
任务4已经完成。当前活跃线程数:3,已完成任务数:2,阻塞队列中的任务数:0
任务3已经完成。当前活跃线程数:2,已完成任务数:3,阻塞队列中的任务数:0
任务2已经完成。当前活跃线程数:2,已完成任务数:3,阻塞队列中的任务数:0
从运行结果,我们可以看出,任务1先进入线程池并执行,任务2和任务3进入阻塞队列,队列达到最大容量,此时再有任务入列则阻塞,直到有任务被移出阻塞队列,此时,任务4和任务5只能进入线程池达到最大线程数,当任务1和任务5执行完毕,任务2和任务3入线程池,任务执行完毕。
4、SynchronousQueue
无缓冲等待队列,我们先来看一下他的构造方法,
/**
* Creates a {@code SynchronousQueue} with the specified fairness policy.
*
* @param fair if true, waiting threads contend in FIFO order for
* access; otherwise the order is unspecified.
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
我们在定义该队列时需要传入true或false,true-基于先进先出的队列,false-基于先进后出的栈。
该队列一个put总是对应一个take,也就是说只有消费端有消费需求,生产端才会允许任务入列。也就是说该阻塞队列中不会缓冲任何队列。
public static ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(corePoolSize,
maxMumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
Queue.synchronous,
RejectPolicy.abortPolicy);
public static void main(String[] args){
for (int i = 1; i <= 5; i++) {
ThreadPool.threadPool.execute(new Task(String.valueOf(i)));
System.out.println("当前活跃线程数:" + ThreadPool.threadPool.getActiveCount()
+ ",已完成任务数:" + ThreadPool.threadPool.getCompletedTaskCount()
+ ",阻塞队列中的任务数:" + ThreadPool.threadPool.getQueue().size());
}
}
运行结果如下:
当前活跃线程数:1,已完成任务数:0,阻塞队列中的任务数:0
任务1正在运行
当前活跃线程数:2,已完成任务数:0,阻塞队列中的任务数:0
任务2正在运行
当前活跃线程数:3,已完成任务数:0,阻塞队列中的任务数:0
任务3正在运行
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task aaa@qq.com rejected from aaa@qq.com[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.frame.threadpool.taskblockingqueue.Test.main(Test.java:8)
任务1已经完成。当前活跃线程数:3,已完成任务数:0,阻塞队列中的任务数:0
任务3已经完成。当前活跃线程数:2,已完成任务数:1,阻塞队列中的任务数:0
任务2已经完成。当前活跃线程数:1,已完成任务数:2,阻塞队列中的任务数:0
由此可见,阻塞队列中线程个数一直为0,任务1,任务2,任务3依次入列,剩下的任务4和任务5因为线程池没有消费需求,导致被抛弃。
如果想详细了解该队列,请参考:
https://blog.csdn.net/zmx729618/article/details/52980158
https://www.jianshu.com/p/376d368cb44f
这个队列的源码比较复杂,笔者也没有完全弄明呢,如果文章有出现任何错误,欢迎指正批评。
上一篇: Java线程池