Django使用Celery异步任务队列的使用
1 celery简介
celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。
任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(mq、redis)。
1.1 celery原理
celery的 架构 由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件:celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括, rabbitmq , redis , mongodb (experimental), amazon sqs (experimental),couchdb (experimental), sqlalchemy (experimental),django orm (experimental), ironmq。推荐使用:rabbitmq、redis作为消息队列。
任务执行单元:worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括amqp, redis,memcached, mongodb,sqlalchemy, django orm,apache cassandra, ironcache
1.2celery适用场景
异步任务处理:例如给注册用户发送短消息或者确认邮件任务。 大型任务:执行时间较长的任务,例如视频和图片处理,添加水印和转码等,需要执行任务时间长。 定时执行的任务:支持任务的定时执行和设定时间执行。例如性能压测定时执行。
2celery开发环境准备
2.1 环境准备
软件名称 |
版本号 |
说明 |
linux |
centos 6.5(64bit) |
操作系统 |
python |
3.5.2 |
|
django |
1.10 |
web框架 |
celery |
4.0.2 |
异步任务队列 |
redis |
2.4 |
消息队列 |
2.2 celery安装
使用方法介绍:
celery的运行依赖消息队列,使用时需要安装redis或者rabbit。
这里我们使用redis。安装redis库:
sudo yum install redis
启动redis:
sudo service redis start
安装celery库
sudo pip install celery==4.0.2
3celery单独执行任务
3.1编写任务
创建task.py文件
说明:这里初始celery实例时就加载了配置,使用的redis作为消息队列和存储任务结果。
运行celery:
$ celery -a task worker --loglevel=info
看到下面的打印,说明celery成功运行。
3.2 调用任务
直接打开python交互命令行
执行下面代码:
可以celery的窗口看到任务的执行信息
任务执行状态监控和获取结果:
3.3任务调用方法总结
有两种方法:
delay和apply_async ,delay方法是apply_async简化版。
add.delay(2, 2) add.apply_async((2, 2)) add.apply_async((2, 2), queue='lopri')
delay方法是apply_async简化版本。
apply_async方法是可以带非常多的配置参数,包括指定队列等
queue 指定队列名称,可以把不同任务分配到不同的队列 3.4 任务状态
每个任务有三种状态:pending -> started -> success
任务查询状态:res.state
来查询任务的状态
4与django集成
上面简单介绍了celery异步任务的基本方法,结合我们实际的应用,我们需要与django一起使用,下面介绍如何与django结合。
4.1与django集成方法
与django集成有两种方法:
- django 1.8 以上版本:与celery 4.0版本集成
- django 1.8 以下版本:与celery3.1版本集成,使用django-celery库
今天我们介绍celery4.0 和django 1.8以上版本集成方法。
4.2 创建项目文件
创建一个项目:名字叫做proj
- proj/ - proj/__init__.py - proj/settings.py - proj/urls.py - proj/wsgi.py - manage.py
创建一个新的文件: proj/proj/mycelery.py
from __future__ import absolute_import, unicode_literals import os from celery import celery # set the default django settings module for the 'celery' program. os.environ.setdefault('django_settings_module', 'proj.settings') app = celery('proj') # using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='celery' means all celery-related configuration keys # should have a `celery_` prefix. app.config_from_object('django.conf:settings', namespace='celery') # load task modules from all registered django app configs. app.autodiscover_tasks()
在proj/proj/__init__.py:添加
from __future__ import absolute_import, unicode_literals # this will make sure the app is always imported when # django starts so that shared_task will use this app. from .mycelery import app as celery_app __all__ = ['celery_app']
4.3 配置celery
我们在mycelery.py文件中说明celery的配置文件在settings.py中,并且是以celery开头。
app.config_from_object('django.conf:settings', namespace='celery')
在settings.py文件中添加celery配置:
我们的配置是使用redis作为消息队列,消息的代理和结果都是用redis,任务的序列化使用json格式。
重要:redis://127.0.0.1:6379/0这个说明使用的redis的0号队列,如果有多个celery任务都使用同一个队列,则会造成任务混乱。最好是celery实例单独使用一个队列。
4.4创建app
创建django的app,名称为celery_task,在app目录下创建tasks.py文件。
完成后目录结构为:
├── celery_task │ ├── admin.py │ ├── apps.py │ ├── __init__.py │ ├── migrations │ │ └── __init__.py │ ├── models.py │ ├── tasks.py │ ├── tests.py │ └── views.py ├── db.sqlite3 ├── manage.py ├── proj │ ├── celery.py │ ├── __init__.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py └── templates
4.5编写task任务
编辑任务文件
tasks.py
在tasks.py文件中添加下面代码
# create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
启动celery:celery -a proj.mycelery worker -l info
说明:proj 为模块名称,mycelery 为celery 的实例所在的文件。
启动成功打印:
4.6在views中调用任务
在views中编写接口,实现两个功能:
- 触发任务,然后返回任务的结果和任务id
- 根据任务id查询任务状态
代码如下:
启动django。
新开一个会话启动celery;启动命令为:celery –a proj.mycelery worker –l info
访问 http://127.0.0.1:8000/add ,可以看到返回的结果。
在celery运行的页面,可以看到下面输出:
4.7在views中查询任务状态
有的时候任务执行时间较长,需要查询任务是否执行完成,可以根据任务的id来查询任务状态,根据状态进行下一步操作。
可以看到任务的状态为:success
5celery定时任务
celery作为异步任务队列,我们可以按照我们设置的时间,定时的执行一些任务,例如每日数据库备份,日志转存等。
celery的定时任务配置非常简单:
定时任务的配置依然在setting.py文件中。
说明:如果觉得celery 的数据配置文件和django 的都在setting.py 一个文件中不方便,可以分拆出来,只需要在mycelery.py 的文件中指明即可。
app.config_from_object('django.conf:yoursettingsfile', namespace='celery')
5.1任务间隔运行
#每30秒调用task.add from datetime import timedelta celery_beat_schedule = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, }
5.2定时执行
定时每天早上7:30分运行。
注意:设置任务时间时注意时间格式,utc时间或者本地时间。
#crontab任务 #每天7:30调用task.add from celery.schedules import crontab celery_beat_schedule = { # executes every monday morning at 7:30 a.m 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30), 'args': (16, 16), }, }
5.3定时任务启动
配置了定时任务,除了worker进程外,还需要启动一个beat进程。
beat进程的作用就相当于一个定时任务,根据配置来执行对应的任务。
5.3.1 启动beat进程
命令如下:celery -a proj.mycelery beat -l info
5.3.2 启动worker进程
worker进程启动和前面启动命令一样。celery –a proj.mycelery worker –l info
6 celery深入
celery任务支持多样的运行模式:
- 支持动态指定并发数 --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
- 支持链式任务
- 支持group任务
- 支持任务不同优先级
- 支持指定任务队列
- 支持使用eventlet模式运行worker
例如:指定并发数为1000
celery -a proj.mycelery worker -c 1000
这些可以根据使用的深入自行了解和学习。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: python2.7安装图文教程