延迟队列DelayQueue的源码解析
DelayQueue类的主要作用:是一个*的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
主要属性
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 持有内部重入锁。
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存放工作任务。
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
// 依赖于重入锁的 condition(出队列的线程使用)
private final Condition available = lock.newCondition();
}
1.Delayed接口
DelayQueue队列与其它的队列最大的不同就是这个队列里的元素必须实现Delayed接口才能入队,我们来看一下这个接口:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
该接口继承自Comparable,也就意味着实现了Delayed接口的类必须有两个方法getDelay和compareTo,示例类:
static class Task implements Delayed{
long time = System.currentTimeMillis();
public Task(long time) {
this.time = time;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + time;
}
}
2.内部队列PriorityQueue
DelayQueue内部使用优先级队列PriorityQueue来存放元素,PriorityQueue队列里的元素会根据某些属性排列先后的顺序,这里正好可以利用Delayed接口里的getDelay的返回值来进行排序,delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。
3.offer()方法
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 判断是否添加成功
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
1.获取锁来执行后续操作
2.元素添加到优先级队列中
3.查看元素是否为队首,如果是队首的话,设置leader为空,唤醒一个消费线程。
这里有一个leader元素它的作用我们后面说,先看一下取元素过程
4.take()方法
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取可中断锁。
lock.lockInterruptibly();
try {
for (;;) {
// 从优先级队列中获取队列头元素
E first = q.peek();
if (first == null)
// 无元素,当前线程加入等待队列,并阻塞
available.await();
else {
// 通过getDelay 方法获取延迟时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延迟时间到期,获取并删除头部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 线程节点进入等待队列 x 纳秒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader == null且还存在元素的话,唤醒一个消费线程。
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1.获取锁
2.取出优先级队列q的首元素
3.如果元素q的队首/队列为空,阻塞
3.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法
4.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露
5.循环以上操作,直到return
5.leader元素的使用
leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程,
我们从take里的逻辑片段来分析
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
如果leader不为null,说明已经有消费者线程拿到锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。
first = null; // don't retain ref while waiting
这里是释放first,是因为first是队列第一个元素的引用,同时可以有很多线程执行,意味着有很多线程持有第一个元素的引用,很有可能导致内存溢出,所以手动释放。