欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java简单手写版本实现时间轮算法

程序员文章站 2022-03-27 08:14:14
时间轮关于时间轮的介绍,网上有很多,这里就不重复了核心思想 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮...

时间轮

关于时间轮的介绍,网上有很多,这里就不重复了

核心思想

  • 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
  • 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
  • 每个槽对应一个环形链表存储该时间应该被执行的任务
  • 需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:31
 */
@slf4j
public class timewheel {
  /**
   * 一个槽的时间间隔(时间轮最小刻度)
   */
  private long tickms;

  /**
   * 时间轮大小(槽的个数)
   */
  private int wheelsize;

  /**
   * 一轮的时间跨度
   */
  private long interval;

  private long currenttime;

  /**
   * 槽
   */
  private timertasklist[] buckets;

  /**
   * 上层时间轮
   */
  private volatile timewheel overflowwheel;

  /**
   * 一个timer只有一个delayqueue
   */
  private delayqueue<timertasklist> delayqueue;

  public timewheel(long tickms, int wheelsize, long currenttime, delayqueue<timertasklist> delayqueue) {
    this.currenttime = currenttime;
    this.tickms = tickms;
    this.wheelsize = wheelsize;
    this.interval = tickms * wheelsize;
    this.buckets = new timertasklist[wheelsize];
    this.currenttime = currenttime - (currenttime % tickms);
    this.delayqueue = delayqueue;
    for (int i = 0; i < wheelsize; i++) {
      buckets[i] = new timertasklist();
    }
  }

  public boolean add(timertaskentry entry) {
    long expiration = entry.getexpirems();
    if (expiration < tickms + currenttime) {
      //到期了
      return false;
    } else if (expiration < currenttime + interval) {
      //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去
      long virtualid = (expiration / tickms);
      int index = (int) (virtualid % wheelsize);
      timertasklist bucket = buckets[index];
      bucket.addtask(entry);
      //设置bucket 过期时间
      if (bucket.setexpiration(virtualid * tickms)) {
        //设好过期时间的bucket需要入队
        delayqueue.offer(bucket);
        return true;
      }
    } else {
      //当前轮不能满足,需要扔到上一轮
      timewheel timewheel = getoverflowwheel();
      return timewheel.add(entry);
    }
    return false;
  }


  private timewheel getoverflowwheel() {
    if (overflowwheel == null) {
      synchronized (this) {
        if (overflowwheel == null) {
          overflowwheel = new timewheel(interval, wheelsize, currenttime, delayqueue);
        }
      }
    }
    return overflowwheel;
  }

  /**
   * 推进指针
   *
   * @param timestamp
   */
  public void advancelock(long timestamp) {
    if (timestamp > currenttime + tickms) {
      currenttime = timestamp - (timestamp % tickms);
      if (overflowwheel != null) {
        this.getoverflowwheel().advancelock(timestamp);
      }
    }
  }
}

定时器接口

/**
 * 定时器
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:30
 */
public interface timer {

  /**
   * 添加一个新任务
   *
   * @param timertask
   */
  void add(timertask timertask);


  /**
   * 推动指针
   *
   * @param timeout
   */
  void advanceclock(long timeout);

  /**
   * 等待执行的任务
   *
   * @return
   */
  int size();

  /**
   * 关闭服务,剩下的无法被执行
   */
  void shutdown();
}

定时器实现

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:33
 */
@slf4j
public class systemtimer implements timer {
  /**
   * 底层时间轮
   */
  private timewheel timewheel;
  /**
   * 一个timer只有一个延时队列
   */
  private delayqueue<timertasklist> delayqueue = new delayqueue<>();
  /**
   * 过期任务执行线程
   */
  private executorservice workerthreadpool;
  /**
   * 轮询delayqueue获取过期任务线程
   */
  private executorservice bossthreadpool;


  public systemtimer() {
    this.timewheel = new timewheel(1, 20, system.currenttimemillis(), delayqueue);
    this.workerthreadpool = executors.newfixedthreadpool(100);
    this.bossthreadpool = executors.newfixedthreadpool(1);
    //20ms推动一次时间轮运转
    this.bossthreadpool.submit(() -> {
      for (; ; ) {
        this.advanceclock(20);
      }
    });
  }


  public void addtimertaskentry(timertaskentry entry) {
    if (!timewheel.add(entry)) {
      //已经过期了
      timertask timertask = entry.gettimertask();
      log.info("=====任务:{} 已到期,准备执行============",timertask.getdesc());
      workerthreadpool.submit(timertask);
    }
  }

  @override
  public void add(timertask timertask) {
    log.info("=======添加任务开始====task:{}", timertask.getdesc());
    timertaskentry entry = new timertaskentry(timertask, timertask.getdelayms() + system.currenttimemillis());
    timertask.settimertaskentry(entry);
    addtimertaskentry(entry);
  }

  /**
   * 推动指针运转获取过期任务
   *
   * @param timeout 时间间隔
   * @return
   */
  @override
  public synchronized void advanceclock(long timeout) {
    try {
      timertasklist bucket = delayqueue.poll(timeout, timeunit.milliseconds);
      if (bucket != null) {
        //推进时间
        timewheel.advancelock(bucket.getexpiration());
        //执行过期任务(包含降级)
        bucket.clear(this::addtimertaskentry);
      }
    } catch (interruptedexception e) {
      log.error("advanceclock error");
    }
  }

  @override
  public int size() {
    //todo
    return 0;
  }

  @override
  public void shutdown() {
    this.bossthreadpool.shutdown();
    this.workerthreadpool.shutdown();
    this.timewheel = null;
  }
}

存储任务的环形链表

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@data
@slf4j
class timertasklist implements delayed {
  /**
   * timertasklist 环形链表使用一个虚拟根节点root
   */
  private timertaskentry root = new timertaskentry(null, -1);

  {
    root.next = root;
    root.prev = root;
  }

  /**
   * bucket的过期时间
   */
  private atomiclong expiration = new atomiclong(-1l);

  public long getexpiration() {
    return expiration.get();
  }

  /**
   * 设置bucket的过期时间,设置成功返回true
   *
   * @param expirationms
   * @return
   */
  boolean setexpiration(long expirationms) {
    return expiration.getandset(expirationms) != expirationms;
  }

  public boolean addtask(timertaskentry entry) {
    boolean done = false;
    while (!done) {
      //如果timertaskentry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止
      entry.remove();
      synchronized (this) {
        if (entry.timedtasklist == null) {
          //加到链表的末尾
          entry.timedtasklist = this;
          timertaskentry tail = root.prev;
          entry.prev = tail;
          entry.next = root;
          tail.next = entry;
          root.prev = entry;
          done = true;
        }
      }
    }
    return true;
  }

  /**
   * 从 timedtasklist 移除指定的 timertaskentry
   *
   * @param entry
   */
  public void remove(timertaskentry entry) {
    synchronized (this) {
      if (entry.gettimedtasklist().equals(this)) {
        entry.next.prev = entry.prev;
        entry.prev.next = entry.next;
        entry.next = null;
        entry.prev = null;
        entry.timedtasklist = null;
      }
    }
  }

  /**
   * 移除所有
   */
  public synchronized void clear(consumer<timertaskentry> entry) {
    timertaskentry head = root.next;
    while (!head.equals(root)) {
      remove(head);
      entry.accept(head);
      head = root.next;
    }
    expiration.set(-1l);
  }

  @override
  public long getdelay(timeunit unit) {
    return math.max(0, unit.convert(expiration.get() - system.currenttimemillis(), timeunit.milliseconds));
  }

  @override
  public int compareto(delayed o) {
    if (o instanceof timertasklist) {
      return long.compare(expiration.get(), ((timertasklist) o).expiration.get());
    }
    return 0;
  }
}

存储任务的容器entry

/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@data
class timertaskentry implements comparable<timertaskentry> {
  private timertask timertask;
  private long expirems;
  volatile timertasklist timedtasklist;
  timertaskentry next;
  timertaskentry prev;

  public timertaskentry(timertask timedtask, long expirems) {
    this.timertask = timedtask;
    this.expirems = expirems;
    this.next = null;
    this.prev = null;
  }

  void remove() {
    timertasklist currentlist = timedtasklist;
    while (currentlist != null) {
      currentlist.remove(this);
      currentlist = timedtasklist;
    }
  }

  @override
  public int compareto(timertaskentry o) {
    return ((int) (this.expirems - o.expirems));
  }
}

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

@data
@slf4j
class timertask implements runnable {
  /**
   * 延时时间
   */
  private long delayms;
  /**
   * 任务所在的entry
   */
  private timertaskentry timertaskentry;

  private string desc;

  public timertask(string desc, long delayms) {
    this.desc = desc;
    this.delayms = delayms;
    this.timertaskentry = null;
  }

  public synchronized void settimertaskentry(timertaskentry entry) {
    // 如果这个timetask已经被一个已存在的timertaskentry持有,先移除一个
    if (timertaskentry != null && timertaskentry != entry) {
      timertaskentry.remove();
    }
    timertaskentry = entry;
  }

  public timertaskentry gettimertaskentry() {
    return timertaskentry;
  }

  @override
  public void run() {
    log.info("============={}任务执行", desc);
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

相关标签: java 时间轮