Celery 异步任务队列 之 周期性执行任务
程序员文章站
2022-07-14 18:33:49
...
一、周期性任务celery beat
celery beat是一个调度器,它可以周期内指定某个worker来执行某个任务。如果我们想周期执行某个任务需要增加
beat_schedule
配置信息
-
在celeryconfig.py新增以下beat_schedule 配置:
broker_url='redis://:@127.0.0.1:6379/1' result_backend='redis://:@127.0.0.1:6379/2' # 指定任务发到那个队列中 task_routes=({ 'proj.tasks.my_task5': {'queue': 'queue1'}, 'proj.tasks.my_task6': {'queue': 'queue1'}, 'proj.tasks.my_task7': {'queue': 'queue2'}, }, ) # 配置周期性任务, 或者定时任务 beat_schedule = { 'every-5-seconds': { 'task': 'proj.tasks.my_task8', 'schedule': 5.0, # 'args': (16, 16), } }
-
tasks.py模块内容如下:
from proj.celery import app as celery_app @celery_app.task def my_task1(a, b): print("my_task1任务正在执行....") return a + b @celery_app.task def my_task2(a, b): print("my_task2任务正在执行....") return a + b @celery_app.task def my_task3(a, b): print("my_task3任务正在执行....") return a + b @celery_app.task def my_task4(a, b): print("my_task3任务正在执行....") return a + b @celery_app.task def my_task5(): print("my_task5任务正在执行....") @celery_app.task def my_task6(): print("my_task6任务正在执行....") @celery_app.task def my_task7(): print("my_task7任务正在执行....") # 周期执行任务 @celery_app.task def my_task8(): print("my_task8任务正在执行....")
二、启动woker处理周期性任务
-
在终端执行以下命令即可激动周期性任务
celery -A proj worker --loglevel=info --beat
-
如果我们想指定在某天某时某分某秒执行某个任务,可以执行cron任务, 增加配置信息如下:
beat_schedule = { 'every-5-minute': { 'task': 'proj.tasks.period_task', 'schedule': 5.0, 'args': (16, 16), }, 'add-every-monday-morning': { 'task': 'proj.tasks.period_task', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }
crontab例子: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
开启一个celery beat服务:celery -A proj beat
小贴士: 如果需要celery保存上次任务运行的时间在数据文件中,通过 -s 指定一个名为celerybeat-schedule的文件即可:celery -A proj beat -s /home/celery/var/run/celerybeat-schedule