在RedHat系Linux上部署Python的Celery框架的教程
celery (芹菜)是基于python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。
架构设计
celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
1. 消息中间件
celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,rabbitmq, redis, mongodb (experimental), amazon sqs (experimental),couchdb (experimental), sqlalchemy (experimental),django orm (experimental), ironmq
2.任务执行单元
worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
3.任务结果存储
task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括amqp, redis,memcached, mongodb,sqlalchemy, django orm,apache cassandra, ironcache
另外, celery还支持不同的并发和序列化的手段
1.并发
prefork, eventlet, gevent, threads/single threaded
2.序列化
pickle, json, yaml, msgpack. zlib, bzip2 compression, cryptographic message signing 等等
安装和运行
celery的安装过程略为复杂,下面的安装过程是基于我的aws ec2的linux版本的安装过程,不同的系统安装过程可能会有差异。大家可以参考官方文档。
首先我选择rabbitmq作为消息中间件,所以要先安装rabbitmq。作为安装准备,先更新yum。
sudo yum -y update
rabbitmq是基于erlang的,所以先安装erlang
# add and enable relevant application repositories:
# note: we are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -uvh remi-release-6*.rpm epel-release-6*.rpm
# finally, download and install erlang:
yum install -y erlang
然后安装rabbitmq
# download the latest rabbitmq package using wget: wget # add the necessary keys for verification: rpm --import # install the .rpm package using yum: yum install rabbitmq-server-3.2.2-1.noarch.rpm
启动rabbitmq服务
rabbitmq-server start
pip install celery
from celery import celery app = celery('tasks', backend='amqp', broker='amqp://guest@localhost//') app.conf.celery_result_backend = 'db+sqlite:///results.sqlite' @app.task def add(x, y): return x + y
在当前目录运行一个worker,用来执行这个加法的task
celery -a tasks worker --loglevel=info
其中-a参数表示的是celery app的名字。注意这里我使用的是sqlalchemy作为结果存储。对应的python包要事先安装好。
worker日志中我们会看到这样的信息
- ** ---------- [config] - ** ---------- .> app: tasks:0x1e68d50 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: db+sqlite:///results.sqlite - *** --- * --- .> concurrency: 8 (prefork)
其中,我们可以看到worker缺省使用prefork来执行并发,并设置并发数为8
下面的任务执行的客户端代码:
from tasks import add import time result = add.delay(4,4) while not result.ready(): print "not ready yet" time.sleep(5) print result.get()
用python执行这段客户端代码,在客户端,结果如下
not ready 8
work日志显示
[2015-03-12 02:54:07,973: info/mainprocess] received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] [2015-03-12 02:54:08,006: info/mainprocess] task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8
这里我们可以发现,每一个task有一个唯一的id,task异步执行在worker上。
这里要注意的是,如果你运行官方文档中的例子,你是无法在客户端得到结果的,这也是我为什么要使用sqlalchemy来存储任务执行结果的原因。官方的例子使用ampq,有可能worker在打印日志的时候取出了task的运行结果显示在worker日志中,然而ampq作为一个消息队列,当消息被取走后,队列中就没有了,于是客户端总是无法得到任务的执行结果。不知道为什么官方文档对这样的错误视而不见。
如果大家想要对celery做更进一步的了解,请参考
上一篇: 详解Python中的多线程编程
下一篇: 鹅蛋怎么做既营养又好吃呢
推荐阅读
-
在Linux系统上通过uWSGI配置Nginx+Python环境的教程
-
在Linux上安装Python的Flask框架和创建第一个app实例的教程
-
在RedHat系Linux上部署Python的Celery框架的教程
-
在Python的Django框架上部署ORM库的教程
-
在Heroku云平台上部署Python的Django框架的教程
-
在Docker上开始部署Python应用的教程
-
Node.js环境在linux上的部署教程
-
在Linux系统上安装Python的Scrapy框架的教程
-
在Linux系统上通过uWSGI配置Nginx+Python环境的教程
-
在RedHat系的Linux中使用nmcli命令管理网络的教程