欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.net core 实现基于 cron 表达式的任务调度

程序员文章站 2022-06-20 23:13:55
上次我们实现了一个简单的基于 Timer 的定时任务,但是使用过程中慢慢发现这种方式可能并不太合适,有些任务可能只希望在某个时间段内执行,只使用 timer 就显得不是那么灵活了,希望可以像 quartz 那样指定一个 cron 表达式来指定任务的执行时间。 ......

.net core 实现基于 cron 表达式的任务调度

intro

上次我们实现了一个简单的基于 timer 的定时任务,详细信息可以看。

但是使用过程中慢慢发现这种方式可能并不太合适,有些任务可能只希望在某个时间段内执行,只使用 timer 就显得不是那么灵活了,希望可以像 quartz 那样指定一个 cron 表达式来指定任务的执行时间。

cron 表达式介绍

cron 常见于unix类unix操作系统之中,用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将其存放于“crontab”文件中,以供之后读取和执行。该词来源于希腊语 chronos(χρόνος),原意是时间。

通常,crontab储存的指令被守护进程激活,crond 常常在后台运行,每一分钟检查是否有预定的作业需要执行。这类作业一般称为cron jobs

cron 可以比较准确的描述周期性执行任务的执行时间,标准的 cron 表达式是五位:

30 4 * * ? 五个位置上的值分别对应 分钟/小时/日期/月份/周(day of week)

现在有一些扩展,有6位的,也有7位的,6位的表达式第一个对应的是秒,7个的第一个对应是秒,最后一个对应的是年份

0 0 12 * * ? 每天中午12点
0 15 10 ? * * 每天 10:15
0 15 10 * * ? 每天 10:15
30 15 10 * * ? * 每天 10:15:30
0 15 10 * * ? 2005 2005年每天 10:15

详细信息可以参考:

.net core cron service

cron 解析库 使用的是 https://github.com/hangfireio/cronos
,支持五位/六位,暂不支持年份的解析(7位)

基于 backgroundservice 的 cron 定时服务,实现如下:

public abstract class cronscheduleservicebase : backgroundservice
{
        /// <summary>
        /// job cron trigger expression
        /// refer to: http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html
        /// </summary>
        public abstract string cronexpression { get; }

        protected abstract bool concurrentallowed { get; }

        protected readonly ilogger logger;

        private readonly string jobclientscache = "jobclientshash";

        protected cronscheduleservicebase(ilogger logger)
        {
            logger = logger;
        }

        protected abstract task processasync(cancellationtoken cancellationtoken);

        protected override async task executeasync(cancellationtoken stoppingtoken)
        {
            {
                var next = cronhelper.getnextoccurrence(cronexpression);
                while (!stoppingtoken.iscancellationrequested && next.hasvalue)
                {
                    var now = datetimeoffset.utcnow;

                    if (now >= next)
                    {
                        if (concurrentallowed)
                        {
                            _ = processasync(stoppingtoken);
                            next = cronhelper.getnextoccurrence(cronexpression);
                            if (next.hasvalue)
                            {
                                logger.loginformation("next at {next}", next);
                            }
                        }
                        else
                        {
                            var machinename = redismanager.hashclient.getorset(jobclientscache, gettype().fullname, () => environment.machinename); // try get job master
                            if (machinename == environment.machinename) // ismaster
                            {
                                using (var locker = redismanager.getredlockclient($"{gettype().fullname}_cronservice"))
                                {
                                    // redis 互斥锁
                                    if (await locker.trylockasync())
                                    {
                                        // 执行 job
                                        await processasync(stoppingtoken);

                                        next = cronhelper.getnextoccurrence(cronexpression);
                                        if (next.hasvalue)
                                        {
                                            logger.loginformation("next at {next}", next);
                                            await task.delay(next.value - datetimeoffset.utcnow, stoppingtoken);
                                        }
                                    }
                                    else
                                    {
                                        logger.loginformation($"failed to acquire lock");
                                    }
                                }
                            }
                        }
                    }
                    else
                    {
                        // needed for graceful shutdown for some reason.
                        // 1000ms so it doesn't affect calculating the next
                        // cron occurence (lowest possible: every second)
                        await task.delay(1000, stoppingtoken);
                    }
                }
            }
        }

        public override task stopasync(cancellationtoken cancellationtoken)
        {
            redismanager.hashclient.remove(jobclientscache, gettype().fullname); // unregister from jobclients
            return base.stopasync(cancellationtoken);
        }
    }

因为网站部署在多台机器上,所以为了防止并发执行,使用 redis 做了一些事情,job执行的时候尝试获取 redis 中 job 对应的 master 的 hostname,没有的话就设置为当前机器的 hostname,在 job 停止的时候也就是应用停止的时候,删除 redis 中当前 job 对应的 master,job执行的时候判断是否是 master 节点,是 master 才执行job,不是 master 则不执行。完整实现代码:https://github.com/weihanli/activityreservation/blob/dev/activityreservation.helper/services/cronscheduleservicebase.cs#l11

定时 job 示例:

public class removeoverduereservationservice : cronscheduleservicebase
{
    private readonly iserviceprovider _serviceprovider;
    private readonly iconfiguration _configuration;

    public removeoverduereservationservice(ilogger<removeoverduereservationservice> logger,
        iserviceprovider serviceprovider, iconfiguration configuration) : base(logger)
    {
        _serviceprovider = serviceprovider;
        _configuration = configuration;
    }

    public override string cronexpression => _configuration.getappsetting("removeoverduereservationcron") ?? "0 0 18 * * ?";

    protected override bool concurrentallowed => false;

    protected override async task processasync(cancellationtoken cancellationtoken)
    {
        using (var scope = _serviceprovider.createscope())
        {
            var reservationrepo = scope.serviceprovider.getrequiredservice<iefrepository<reservationdbcontext, reservation>>();
            await reservationrepo.deleteasync(reservation => reservation.reservationstatus == 0 && (reservation.reservationfordate < datetime.today.adddays(-3)));
        }
    }
}

完整实现代码:https://github.com/weihanli/activityreservation/blob/dev/activityreservation.helper/services/removeoverduereservationservice.cs

memo

使用 redis 这种方式来决定 master 并不是特别可靠,正常结束的没有什么问题,最好还是用比较成熟的服务注册发现框架比较好

reference