推荐一款高效的处理延迟任务神器
程序员文章站
2022-06-23 22:30:48
时间轮算法 时间轮是一种高效、低延迟的调度数据结构。其在Linux内核中广泛使用,是Linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景 ......
时间轮算法
时间轮是一种高效、低延迟的调度数据结构。其在linux内核中广泛使用,是linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景一般只需要用到原始时间轮就可以了。
代码案例
推荐使用netty
提供的hashedwheeltimer
工具类来实现延迟任务。
引入依赖:
<dependency> <groupid>io.netty</groupid> <artifactid>netty-common</artifactid> <version>4.1.23.final</version> </dependency>
红包过期队列信息:
/** * 红包过期队列信息 */ public class redpackettimertask implements timertask { private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss"); /** * 红包 id */ private final long redpacketid; /** * 创建时间戳 */ private final long timestamp; public redpackettimertask(long redpacketid) { this.redpacketid = redpacketid; this.timestamp = system.currenttimemillis(); } @override public void run(timeout timeout) { //异步处理任务 system.out.println(string.format("任务执行时间:%s,红包创建时间:%s,红包id:%s", localdatetime.now().format(f), localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), redpacketid)); } }
测试用例:
/** * 基于 netty 的时间轮算法 hashedwheeltimer 实现的延迟任务 */ public class redpackethashedwheeltimer { private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss"); public static void main(string[] args) throws exception { threadfactory factory = r -> { thread thread = new thread(r); thread.setdaemon(true); thread.setname("redpackethashedwheeltimerworker"); return thread; }; /** * @param tickduration - 每tick一次的时间间隔 * @param unit - tickduration 的时间单位 * @param ticksperwheel - 时间轮中的槽数 * @param leakdetection - 检查内存溢出 */ timer timer = new hashedwheeltimer(factory, 1, timeunit.seconds, 100,true); system.out.println(string.format("开始任务时间:%s",localdatetime.now().format(f))); for(int i=1;i<10;i++){ timertask timertask = new redpackettimertask(i); timer.newtimeout(timertask, i, timeunit.seconds); } thread.sleep(integer.max_value); } }
打印任务执行日志:
开始任务时间:2020-02-12 15:22:23.404 任务执行时间:2020-02-12 15:22:25.410,红包创建时间:2020-02-12 15:22:23.409,红包id:1 任务执行时间:2020-02-12 15:22:26.411,红包创建时间:2020-02-12 15:22:23.414,红包id:2 任务执行时间:2020-02-12 15:22:27.424,红包创建时间:2020-02-12 15:22:23.414,红包id:3 任务执行时间:2020-02-12 15:22:28.410,红包创建时间:2020-02-12 15:22:23.414,红包id:4 任务执行时间:2020-02-12 15:22:29.411,红包创建时间:2020-02-12 15:22:23.414,红包id:5 任务执行时间:2020-02-12 15:22:30.409,红包创建时间:2020-02-12 15:22:23.414,红包id:6 任务执行时间:2020-02-12 15:22:31.411,红包创建时间:2020-02-12 15:22:23.414,红包id:7 任务执行时间:2020-02-12 15:22:32.409,红包创建时间:2020-02-12 15:22:23.414,红包id:8 任务执行时间:2020-02-12 15:22:33.411,红包创建时间:2020-02-12 15:22:23.414,红包id:9
源码相关
其核心是workerthread
线程,主要负责每过tickduration
时间就累加一次tick
。同时也负责执行到期的timeout
任务以及添加timeout
任务到指定的wheel
中。
构造方法:
public hashedwheeltimer( threadfactory threadfactory, long tickduration, timeunit unit, int ticksperwheel, boolean leakdetection, long maxpendingtimeouts) { if (threadfactory == null) { throw new nullpointerexception("threadfactory"); } if (unit == null) { throw new nullpointerexception("unit"); } if (tickduration <= 0) { throw new illegalargumentexception("tickduration must be greater than 0: " + tickduration); } if (ticksperwheel <= 0) { throw new illegalargumentexception("ticksperwheel must be greater than 0: " + ticksperwheel); } // normalize ticksperwheel to power of two and initialize the wheel. wheel = createwheel(ticksperwheel); mask = wheel.length - 1; // convert tickduration to nanos. this.tickduration = unit.tonanos(tickduration); // prevent overflow. if (this.tickduration >= long.max_value / wheel.length) { throw new illegalargumentexception(string.format( "tickduration: %d (expected: 0 < tickduration in nanos < %d", tickduration, long.max_value / wheel.length)); } //这里-爪洼笔记 workerthread = threadfactory.newthread(worker); leak = leakdetection || !workerthread.isdaemon() ? leakdetector.track(this) : null; this.maxpendingtimeouts = maxpendingtimeouts; if (instance_counter.incrementandget() > instance_count_limit && warned_too_many_instances.compareandset(false, true)) { reporttoomanyinstances(); } }
新增任务,创建即启动:
public timeout newtimeout(timertask task, long delay, timeunit unit) { if (task == null) { throw new nullpointerexception("task"); } if (unit == null) { throw new nullpointerexception("unit"); } long pendingtimeoutscount = pendingtimeouts.incrementandget(); if (maxpendingtimeouts > 0 && pendingtimeoutscount > maxpendingtimeouts) { pendingtimeouts.decrementandget(); throw new rejectedexecutionexception("number of pending timeouts (" + pendingtimeoutscount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxpendingtimeouts + ")"); } //这里-爪洼笔记 start(); // add the timeout to the timeout queue which will be processed on the next tick. // during processing all the queued hashedwheeltimeouts will be added to the correct hashedwheelbucket. long deadline = system.nanotime() + unit.tonanos(delay) - starttime; // guard against overflow. if (delay > 0 && deadline < 0) { deadline = long.max_value; } hashedwheeltimeout timeout = new hashedwheeltimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
线程启动:
/** * starts the background thread explicitly. the background thread will * start automatically on demand even if you did not call this method. * * @throws illegalstateexception if this timer has been * {@linkplain #stop() stopped} already */ public void start() { switch (worker_state_updater.get(this)) { case worker_state_init: if (worker_state_updater.compareandset(this, worker_state_init, worker_state_started)) { workerthread.start(); } break; case worker_state_started: break; case worker_state_shutdown: throw new illegalstateexception("cannot be started once stopped"); default: throw new error("invalid workerstate"); } // wait until the starttime is initialized by the worker. while (starttime == 0) { try { starttimeinitialized.await(); } catch (interruptedexception ignore) { // ignore - it will be ready very soon. } } }
执行相关操作:
public void run() { // initialize the starttime. starttime = system.nanotime(); if (starttime == 0) { // we use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. starttime = 1; } // notify the other threads waiting for the initialization at start(). starttimeinitialized.countdown(); do { final long deadline = waitfornexttick(); if (deadline > 0) { int idx = (int) (tick & mask); processcancelledtasks(); hashedwheelbucket bucket = wheel[idx]; transfertimeoutstobuckets(); bucket.expiretimeouts(deadline); tick++; } } while (worker_state_updater.get(hashedwheeltimer.this) == worker_state_started); // fill the unprocessedtimeouts so we can return them from stop() method. for (hashedwheelbucket bucket: wheel) { bucket.cleartimeouts(unprocessedtimeouts); } for (;;) { hashedwheeltimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.iscancelled()) { unprocessedtimeouts.add(timeout); } } processcancelledtasks(); }
小结
以上方案并没有实现持久化和分布式,生产环境可根据实际业务需求选择使用。
源码
https://gitee.com/52itstyle/spring-boot-seckill
上一篇: 分布式系统的CAP定理
下一篇: 微信抢红包过期失效实战案例