欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

推荐一款高效的处理延迟任务神器

程序员文章站 2022-03-21 16:30:24
时间轮算法 时间轮是一种高效、低延迟的调度数据结构。其在Linux内核中广泛使用,是Linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景 ......

推荐一款高效的处理延迟任务神器

时间轮算法

时间轮是一种高效、低延迟的调度数据结构。其在linux内核中广泛使用,是linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景一般只需要用到原始时间轮就可以了。

代码案例

推荐使用netty提供的hashedwheeltimer工具类来实现延迟任务。

引入依赖:

<dependency>
      <groupid>io.netty</groupid>
      <artifactid>netty-common</artifactid>
      <version>4.1.23.final</version>
</dependency>

红包过期队列信息:

/**
 * 红包过期队列信息
 */
public class redpackettimertask implements timertask {

    private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss");

    /**
     * 红包 id
     */
    private final long redpacketid;

    /**
     * 创建时间戳
     */
    private final long timestamp;

    public redpackettimertask(long redpacketid) {
        this.redpacketid = redpacketid;
        this.timestamp = system.currenttimemillis();
    }

    @override
    public void run(timeout timeout) {
        //异步处理任务
        system.out.println(string.format("任务执行时间:%s,红包创建时间:%s,红包id:%s",
                localdatetime.now().format(f), localdatetime.ofinstant(instant.ofepochmilli(timestamp), zoneid.systemdefault()).format(f), redpacketid));
    }
}

测试用例:

/**
 * 基于 netty 的时间轮算法 hashedwheeltimer 实现的延迟任务
 */
public class redpackethashedwheeltimer {

    private static final datetimeformatter f = datetimeformatter.ofpattern("yyyy-mm-dd hh:mm:ss.sss");

    public static void main(string[] args) throws exception {
        threadfactory factory = r -> {
            thread thread = new thread(r);
            thread.setdaemon(true);
            thread.setname("redpackethashedwheeltimerworker");
            return thread;
        };
        /**
         * @param tickduration - 每tick一次的时间间隔
         * @param unit - tickduration 的时间单位
         * @param ticksperwheel - 时间轮中的槽数
         * @param leakdetection - 检查内存溢出
         */
        timer timer = new hashedwheeltimer(factory, 1,
                                           timeunit.seconds, 100,true);
        system.out.println(string.format("开始任务时间:%s",localdatetime.now().format(f)));
        for(int i=1;i<10;i++){
            timertask timertask = new redpackettimertask(i);
            timer.newtimeout(timertask, i, timeunit.seconds);
        }
        thread.sleep(integer.max_value);
    }
}

打印任务执行日志:

开始任务时间:2020-02-12 15:22:23.404
任务执行时间:2020-02-12 15:22:25.410,红包创建时间:2020-02-12 15:22:23.409,红包id:1
任务执行时间:2020-02-12 15:22:26.411,红包创建时间:2020-02-12 15:22:23.414,红包id:2
任务执行时间:2020-02-12 15:22:27.424,红包创建时间:2020-02-12 15:22:23.414,红包id:3
任务执行时间:2020-02-12 15:22:28.410,红包创建时间:2020-02-12 15:22:23.414,红包id:4
任务执行时间:2020-02-12 15:22:29.411,红包创建时间:2020-02-12 15:22:23.414,红包id:5
任务执行时间:2020-02-12 15:22:30.409,红包创建时间:2020-02-12 15:22:23.414,红包id:6
任务执行时间:2020-02-12 15:22:31.411,红包创建时间:2020-02-12 15:22:23.414,红包id:7
任务执行时间:2020-02-12 15:22:32.409,红包创建时间:2020-02-12 15:22:23.414,红包id:8
任务执行时间:2020-02-12 15:22:33.411,红包创建时间:2020-02-12 15:22:23.414,红包id:9

源码相关

其核心是workerthread线程,主要负责每过tickduration时间就累加一次tick。同时也负责执行到期的timeout任务以及添加timeout任务到指定的wheel中。

构造方法:

public hashedwheeltimer(
            threadfactory threadfactory,
            long tickduration, timeunit unit, int ticksperwheel, boolean leakdetection,
            long maxpendingtimeouts) {

        if (threadfactory == null) {
            throw new nullpointerexception("threadfactory");
        }
        if (unit == null) {
            throw new nullpointerexception("unit");
        }
        if (tickduration <= 0) {
            throw new illegalargumentexception("tickduration must be greater than 0: " + tickduration);
        }
        if (ticksperwheel <= 0) {
            throw new illegalargumentexception("ticksperwheel must be greater than 0: " + ticksperwheel);
        }

        // normalize ticksperwheel to power of two and initialize the wheel.
        wheel = createwheel(ticksperwheel);
        mask = wheel.length - 1;

        // convert tickduration to nanos.
        this.tickduration = unit.tonanos(tickduration);

        // prevent overflow.
        if (this.tickduration >= long.max_value / wheel.length) {
            throw new illegalargumentexception(string.format(
                    "tickduration: %d (expected: 0 < tickduration in nanos < %d",
                    tickduration, long.max_value / wheel.length));
        }
        //这里-爪洼笔记
        workerthread = threadfactory.newthread(worker);

        leak = leakdetection || !workerthread.isdaemon() ? leakdetector.track(this) : null;

        this.maxpendingtimeouts = maxpendingtimeouts;

        if (instance_counter.incrementandget() > instance_count_limit &&
            warned_too_many_instances.compareandset(false, true)) {
            reporttoomanyinstances();
        }
}

新增任务,创建即启动:

public timeout newtimeout(timertask task, long delay, timeunit unit) {
        if (task == null) {
            throw new nullpointerexception("task");
        }
        if (unit == null) {
            throw new nullpointerexception("unit");
        }

        long pendingtimeoutscount = pendingtimeouts.incrementandget();

        if (maxpendingtimeouts > 0 && pendingtimeoutscount > maxpendingtimeouts) {
            pendingtimeouts.decrementandget();
            throw new rejectedexecutionexception("number of pending timeouts ("
                + pendingtimeoutscount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxpendingtimeouts + ")");
        }
        //这里-爪洼笔记
        start();

        // add the timeout to the timeout queue which will be processed on the next tick.
        // during processing all the queued hashedwheeltimeouts will be added to the correct hashedwheelbucket.
        long deadline = system.nanotime() + unit.tonanos(delay) - starttime;

        // guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = long.max_value;
        }
        hashedwheeltimeout timeout = new hashedwheeltimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
}

线程启动:

   /**
     * starts the background thread explicitly.  the background thread will
     * start automatically on demand even if you did not call this method.
     *
     * @throws illegalstateexception if this timer has been
     *                               {@linkplain #stop() stopped} already
     */
    public void start() {
        switch (worker_state_updater.get(this)) {
            case worker_state_init:
                if (worker_state_updater.compareandset(this, worker_state_init, worker_state_started)) {
                    workerthread.start();
                }
                break;
            case worker_state_started:
                break;
            case worker_state_shutdown:
                throw new illegalstateexception("cannot be started once stopped");
            default:
                throw new error("invalid workerstate");
        }

        // wait until the starttime is initialized by the worker.
        while (starttime == 0) {
            try {
                starttimeinitialized.await();
            } catch (interruptedexception ignore) {
                // ignore - it will be ready very soon.
            }
        }
    }

执行相关操作:

public void run() {
            // initialize the starttime.
            starttime = system.nanotime();
            if (starttime == 0) {
                // we use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                starttime = 1;
            }

            // notify the other threads waiting for the initialization at start().
            starttimeinitialized.countdown();

            do {
                final long deadline = waitfornexttick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    processcancelledtasks();
                    hashedwheelbucket bucket =
                            wheel[idx];
                    transfertimeoutstobuckets();
                    bucket.expiretimeouts(deadline);
                    tick++;
                }
            } while (worker_state_updater.get(hashedwheeltimer.this) == worker_state_started);

            // fill the unprocessedtimeouts so we can return them from stop() method.
            for (hashedwheelbucket bucket: wheel) {
                bucket.cleartimeouts(unprocessedtimeouts);
            }
            for (;;) {
                hashedwheeltimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.iscancelled()) {
                    unprocessedtimeouts.add(timeout);
                }
            }
            processcancelledtasks();
}

小结

以上方案并没有实现持久化和分布式,生产环境可根据实际业务需求选择使用。

源码

https://gitee.com/52itstyle/spring-boot-seckill