欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Tornado集成Apscheduler定时任务

程序员文章站 2022-06-22 21:17:12
熟悉Python的人可能都知道,Apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的Quartz移植而来。 之前有用过Flask版本的Apscheduler做定时任务。刚好前不久接触了Tornado,顺便玩玩Tornado版本的Apscheduler。 本篇做了 ......

熟悉python的人可能都知道,apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的quartz移植而来。

之前有用过flask版本的apscheduler做定时任务。刚好前不久接触了tornado,顺便玩玩tornado版本的apscheduler。

本篇做了一个简单的wdb页面,用于添加和删除定时任务,小伙伴们可以基于这个做一些扩展,比如把定时定时任务写入数据库,改变cron规则等等。

#新增任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=add

#删除任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=remov

执行结果可以在console看到

 

from datetime import datetime
from tornado.ioloop import ioloop, periodiccallback
from tornado.web import requesthandler, application
from apscheduler.schedulers.tornado import tornadoscheduler


scheduler = none
job_ids   = []

# 初始化
def init_scheduler():
    global scheduler
    scheduler = tornadoscheduler()
    scheduler.start()
    print('[scheduler init]apscheduler has been started')

# 要执行的定时任务在这里
def task1(options):
    print('{} [apscheduler][task]-{}'.format(datetime.now().strftime('%y-%m-%d %h:%m:%s.%f'), options))


class mainhandler(requesthandler):
    def get(self):
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')


class schedulerhandler(requesthandler):
    def get(self):
        global job_ids
        job_id = self.get_query_argument('job_id', none)
        action = self.get_query_argument('action', none)
        if job_id:
            # add
            if 'add' == action:
                if job_id not in job_ids:
                    job_ids.append(job_id)
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
                    self.write('[task added] - {}'.format(job_id))
                else:
                    self.write('[task exists] - {}'.format(job_id))
            # remove
            elif 'remove' == action:
                if job_id in job_ids:
                    scheduler.remove_job(job_id)
                    job_ids.remove(job_id)
                    self.write('[task removed] - {}'.format(job_id))
                else:
                    self.write('[task not found] - {}'.format(job_id))
        else:
            self.write('[invalid params] invalid job_id or action')


if __name__ == "__main__":
    routes    = [
        (r"/", mainhandler),
        (r"/scheduler/?", schedulerhandler),
    ]
    init_scheduler()
    app       = application(routes, debug=true)
    app.listen(8888)
    ioloop.current().start()