欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Golang开源定时任务调度框架robfig/cron优化

程序员文章站 2022-06-16 19:52:52
项目中需要使用一个简单的定时任务调度的框架,最初直接从GitHub上搜了一个star比较多的,就是 https://github.com/robfig/cron 这个,目前有8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近500QPS,随着 ......

项目中需要使用一个简单的定时任务调度的框架,最初直接从github上搜了一个star比较多的,就是  这个,目前有8000+ star。刚开始使用的时候发现问题不大,但是随着单机需要定时调度的任务越来越多,高峰期差不多接近500qps,随着业务的推广使用,可以预期增长还会比较快,但是已经遇到cpu使用率偏高的问题,通过pprof分析,很多都是在做排序,看了下这个项目的代码,整体执行的过程大概如下:

1. 对所有任务进行排序,按照下次执行时间进行排序

2. 选择数组中第一个任务,计算下次执行时间减去当前时间得到时间t,然后sleep t

3. 然后从数组第一个元素开始遍历任务,如果此任务需要调度的时间<now,那么就执行此任务,执行之后重新计算这个任务的next执行时间

4. 每次待执行的任务执行完毕之后,都会重新对这个数组进行排序

5. 然后再循环从排好序的数组中找到第一个需要执行的任务去执行。

 

 

代码如下:

for {
        // determine the next entry to run.
        sort.sort(bytime(c.entries))

        var timer *time.timer
        if len(c.entries) == 0 || c.entries[0].next.iszero() {
            // if there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.newtimer(100000 * time.hour)
        } else {
            timer = time.newtimer(c.entries[0].next.sub(now))
        }

        for {
            select {
            case now = <-timer.c:
                now = now.in(c.location)
                c.logger.info("wake", "now", now)

                // run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.next.after(now) || e.next.iszero() {
                        break
                    }
                    c.startjob(e.wrappedjob)
                    e.prev = e.next
                    e.next = e.schedule.next(now)
                    c.logger.info("run", "now", now, "entry", e.id, "next", e.next)
                }

            case newentry := <-c.add:
                timer.stop()
                now = c.now()
                newentry.next = newentry.schedule.next(now)
                c.entries = append(c.entries, newentry)
                c.logger.info("added", "now", now, "entry", newentry.id, "next", newentry.next)

            case replychan := <-c.snapshot:
                replychan <- c.entrysnapshot()
                continue

            case <-c.stop:
                timer.stop()
                c.logger.info("stop")
                return

            case id := <-c.remove:
                timer.stop()
                now = c.now()
                c.removeentry(id)
                c.logger.info("removed", "entry", id)
            }

            break
        }
    }

问题就显而易见了,执行一个任务(或几个任务)都重新计算next执行时间,重新排序,最坏情况就是每次执行1个任务,排序一遍,那么执行k个任务需要的时间的时间复杂度就是o(k*nlogn),这无疑是非常低效的。

于是想着怎么优化一下这个框架,不难想到每次找最先需要执行的任务就是从一堆任务中找schedule_time最小的那一个(设schedule_time是任务的执行时间),那么比较容易想到的思路就是使用最小堆:

1. 在初始化任务列表的时候就直接构建一个最小堆

2. 每次执行查看peek元素是否需要执行

3. 需要执行就pop堆顶元素,计算next执行时间,重新push入堆

4. 不需要执行就break到外层循环取堆顶元素,计算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。

 

我修改为min-heap的方式之后,每次添加任务的时候通过堆的属性进行up和down调整,每次添加任务时间复杂度o(logn),执行k个任务时间复杂度是o(klogn)。经过验证线上cpu使用降低4~5倍。cpu从50%左右降低至10%左右

 

优化后的代码如下,只是其中一部分,关键部分已经高亮。

全部的代码也已经在github上已经创建了一个fork的仓库并推送上去了,全部单测case也都pass。感兴趣可以点过去看。

    for {
        // determine the next entry to run.
        // use min-heap no need sort anymore


     // 这里不再需要排序了,因为add的时候直接进行堆调整
     //sort.sort(bytime(c.entries)) var timer *time.timer if len(c.entries) == 0 || c.entries[0].next.iszero() { // if there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.newtimer(100000 * time.hour) } else { timer = time.newtimer(c.entries[0].next.sub(now)) //fmt.printf(" %v, %+v\n", c.entries[0].next.sub(now), c.entries[0].id) } for { select { case now = <-timer.c: now = now.in(c.location) c.logger.info("wake", "now", now) // run every entry whose next time was less than now for { e := c.entries.peek() if e.next.after(now) || e.next.iszero() { break } e = heap.pop(&c.entries).(*entry) c.startjob(e.wrappedjob) e.prev = e.next e.next = e.schedule.next(now) heap.push(&c.entries, e) c.logger.info("run", "now", now, "entry", e.id, "next", e.next) } case newentry := <-c.add: timer.stop() now = c.now() newentry.next = newentry.schedule.next(now) heap.push(&c.entries, newentry) c.logger.info("added", "now", now, "entry", newentry.id, "next", newentry.next) case replychan := <-c.snapshot: replychan <- c.entrysnapshot() continue case <-c.stop: timer.stop() c.logger.info("stop") return case id := <-c.remove: timer.stop() now = c.now() c.removeentry(id) c.logger.info("removed", "entry", id) } break } }