java简单手写版本实现时间轮算法
程序员文章站
2022-07-03 19:09:30
时间轮关于时间轮的介绍,网上有很多,这里就不重复了核心思想 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮...
时间轮
关于时间轮的介绍,网上有很多,这里就不重复了
核心思想
- 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
- 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
- 每个槽对应一个环形链表存储该时间应该被执行的任务
- 需要一个线程去驱动指针运转,获取到期任务
以下给出java 简单手写版本实现
代码实现
时间轮主数据结构
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:31 */ @slf4j public class timewheel { /** * 一个槽的时间间隔(时间轮最小刻度) */ private long tickms; /** * 时间轮大小(槽的个数) */ private int wheelsize; /** * 一轮的时间跨度 */ private long interval; private long currenttime; /** * 槽 */ private timertasklist[] buckets; /** * 上层时间轮 */ private volatile timewheel overflowwheel; /** * 一个timer只有一个delayqueue */ private delayqueue<timertasklist> delayqueue; public timewheel(long tickms, int wheelsize, long currenttime, delayqueue<timertasklist> delayqueue) { this.currenttime = currenttime; this.tickms = tickms; this.wheelsize = wheelsize; this.interval = tickms * wheelsize; this.buckets = new timertasklist[wheelsize]; this.currenttime = currenttime - (currenttime % tickms); this.delayqueue = delayqueue; for (int i = 0; i < wheelsize; i++) { buckets[i] = new timertasklist(); } } public boolean add(timertaskentry entry) { long expiration = entry.getexpirems(); if (expiration < tickms + currenttime) { //到期了 return false; } else if (expiration < currenttime + interval) { //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去 long virtualid = (expiration / tickms); int index = (int) (virtualid % wheelsize); timertasklist bucket = buckets[index]; bucket.addtask(entry); //设置bucket 过期时间 if (bucket.setexpiration(virtualid * tickms)) { //设好过期时间的bucket需要入队 delayqueue.offer(bucket); return true; } } else { //当前轮不能满足,需要扔到上一轮 timewheel timewheel = getoverflowwheel(); return timewheel.add(entry); } return false; } private timewheel getoverflowwheel() { if (overflowwheel == null) { synchronized (this) { if (overflowwheel == null) { overflowwheel = new timewheel(interval, wheelsize, currenttime, delayqueue); } } } return overflowwheel; } /** * 推进指针 * * @param timestamp */ public void advancelock(long timestamp) { if (timestamp > currenttime + tickms) { currenttime = timestamp - (timestamp % tickms); if (overflowwheel != null) { this.getoverflowwheel().advancelock(timestamp); } } } }
定时器接口
/** * 定时器 * @author apdoer * @version 1.0 * @date 2021/3/22 20:30 */ public interface timer { /** * 添加一个新任务 * * @param timertask */ void add(timertask timertask); /** * 推动指针 * * @param timeout */ void advanceclock(long timeout); /** * 等待执行的任务 * * @return */ int size(); /** * 关闭服务,剩下的无法被执行 */ void shutdown(); }
定时器实现
/** * @author apdoer * @version 1.0 * @date 2021/3/22 20:33 */ @slf4j public class systemtimer implements timer { /** * 底层时间轮 */ private timewheel timewheel; /** * 一个timer只有一个延时队列 */ private delayqueue<timertasklist> delayqueue = new delayqueue<>(); /** * 过期任务执行线程 */ private executorservice workerthreadpool; /** * 轮询delayqueue获取过期任务线程 */ private executorservice bossthreadpool; public systemtimer() { this.timewheel = new timewheel(1, 20, system.currenttimemillis(), delayqueue); this.workerthreadpool = executors.newfixedthreadpool(100); this.bossthreadpool = executors.newfixedthreadpool(1); //20ms推动一次时间轮运转 this.bossthreadpool.submit(() -> { for (; ; ) { this.advanceclock(20); } }); } public void addtimertaskentry(timertaskentry entry) { if (!timewheel.add(entry)) { //已经过期了 timertask timertask = entry.gettimertask(); log.info("=====任务:{} 已到期,准备执行============",timertask.getdesc()); workerthreadpool.submit(timertask); } } @override public void add(timertask timertask) { log.info("=======添加任务开始====task:{}", timertask.getdesc()); timertaskentry entry = new timertaskentry(timertask, timertask.getdelayms() + system.currenttimemillis()); timertask.settimertaskentry(entry); addtimertaskentry(entry); } /** * 推动指针运转获取过期任务 * * @param timeout 时间间隔 * @return */ @override public synchronized void advanceclock(long timeout) { try { timertasklist bucket = delayqueue.poll(timeout, timeunit.milliseconds); if (bucket != null) { //推进时间 timewheel.advancelock(bucket.getexpiration()); //执行过期任务(包含降级) bucket.clear(this::addtimertaskentry); } } catch (interruptedexception e) { log.error("advanceclock error"); } } @override public int size() { //todo return 0; } @override public void shutdown() { this.bossthreadpool.shutdown(); this.workerthreadpool.shutdown(); this.timewheel = null; } }
存储任务的环形链表
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:26 */ @data @slf4j class timertasklist implements delayed { /** * timertasklist 环形链表使用一个虚拟根节点root */ private timertaskentry root = new timertaskentry(null, -1); { root.next = root; root.prev = root; } /** * bucket的过期时间 */ private atomiclong expiration = new atomiclong(-1l); public long getexpiration() { return expiration.get(); } /** * 设置bucket的过期时间,设置成功返回true * * @param expirationms * @return */ boolean setexpiration(long expirationms) { return expiration.getandset(expirationms) != expirationms; } public boolean addtask(timertaskentry entry) { boolean done = false; while (!done) { //如果timertaskentry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止 entry.remove(); synchronized (this) { if (entry.timedtasklist == null) { //加到链表的末尾 entry.timedtasklist = this; timertaskentry tail = root.prev; entry.prev = tail; entry.next = root; tail.next = entry; root.prev = entry; done = true; } } } return true; } /** * 从 timedtasklist 移除指定的 timertaskentry * * @param entry */ public void remove(timertaskentry entry) { synchronized (this) { if (entry.gettimedtasklist().equals(this)) { entry.next.prev = entry.prev; entry.prev.next = entry.next; entry.next = null; entry.prev = null; entry.timedtasklist = null; } } } /** * 移除所有 */ public synchronized void clear(consumer<timertaskentry> entry) { timertaskentry head = root.next; while (!head.equals(root)) { remove(head); entry.accept(head); head = root.next; } expiration.set(-1l); } @override public long getdelay(timeunit unit) { return math.max(0, unit.convert(expiration.get() - system.currenttimemillis(), timeunit.milliseconds)); } @override public int compareto(delayed o) { if (o instanceof timertasklist) { return long.compare(expiration.get(), ((timertasklist) o).expiration.get()); } return 0; } }
存储任务的容器entry
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:26 */ @data class timertaskentry implements comparable<timertaskentry> { private timertask timertask; private long expirems; volatile timertasklist timedtasklist; timertaskentry next; timertaskentry prev; public timertaskentry(timertask timedtask, long expirems) { this.timertask = timedtask; this.expirems = expirems; this.next = null; this.prev = null; } void remove() { timertasklist currentlist = timedtasklist; while (currentlist != null) { currentlist.remove(this); currentlist = timedtasklist; } } @override public int compareto(timertaskentry o) { return ((int) (this.expirems - o.expirems)); } }
任务包装类(这里也可以将工作任务以线程变量的方式去传入)
@data @slf4j class timertask implements runnable { /** * 延时时间 */ private long delayms; /** * 任务所在的entry */ private timertaskentry timertaskentry; private string desc; public timertask(string desc, long delayms) { this.desc = desc; this.delayms = delayms; this.timertaskentry = null; } public synchronized void settimertaskentry(timertaskentry entry) { // 如果这个timetask已经被一个已存在的timertaskentry持有,先移除一个 if (timertaskentry != null && timertaskentry != entry) { timertaskentry.remove(); } timertaskentry = entry; } public timertaskentry gettimertaskentry() { return timertaskentry; } @override public void run() { log.info("============={}任务执行", desc); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。