欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

延迟队列DelayQueue的源码解析

程序员文章站 2024-03-18 08:46:16
...

DelayQueue类的主要作用:是一个*的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

主要属性
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    // 持有内部重入锁。
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先级队列,存放工作任务。
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    private Thread leader = null;
    // 依赖于重入锁的 condition(出队列的线程使用)
    private final Condition available = lock.newCondition();
}
1.Delayed接口

DelayQueue队列与其它的队列最大的不同就是这个队列里的元素必须实现Delayed接口才能入队,我们来看一下这个接口:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

该接口继承自Comparable,也就意味着实现了Delayed接口的类必须有两个方法getDelay和compareTo,示例类:

static class Task implements Delayed{
        long time = System.currentTimeMillis();
        public Task(long time) {
            this.time = time;
        }
        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1;
            else 
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }
        @Override
        public String toString() {
            return "" + time;
        }
    }
2.内部队列PriorityQueue

DelayQueue内部使用优先级队列PriorityQueue来存放元素,PriorityQueue队列里的元素会根据某些属性排列先后的顺序,这里正好可以利用Delayed接口里的getDelay的返回值来进行排序,delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。

3.offer()方法
    /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return {@code true}
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 判断是否添加成功
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

1.获取锁来执行后续操作
2.元素添加到优先级队列中
3.查看元素是否为队首,如果是队首的话,设置leader为空,唤醒一个消费线程。

这里有一个leader元素它的作用我们后面说,先看一下取元素过程

4.take()方法
/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获取可中断锁。
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 从优先级队列中获取队列头元素
                E first = q.peek();
                if (first == null)
                    // 无元素,当前线程加入等待队列,并阻塞
                    available.await();
                else {
                    // 通过getDelay 方法获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        // 延迟时间到期,获取并删除头部元素。
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 线程节点进入等待队列 x 纳秒。
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader == null且还存在元素的话,唤醒一个消费线程。
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

1.获取锁
2.取出优先级队列q的首元素
3.如果元素q的队首/队列为空,阻塞
3.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法
4.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露
5.循环以上操作,直到return

5.leader元素的使用

leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程,
我们从take里的逻辑片段来分析

if (leader != null)
      available.await();
else {
      Thread thisThread = Thread.currentThread();
      leader = thisThread;
      try {
         available.awaitNanos(delay);
      } finally {
         if (leader == thisThread)
             leader = null;
      }
}

如果leader不为null,说明已经有消费者线程拿到锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。

first = null; // don't retain ref while waiting

这里是释放first,是因为first是队列第一个元素的引用,同时可以有很多线程执行,意味着有很多线程持有第一个元素的引用,很有可能导致内存溢出,所以手动释放。