建模杂谈系列39- celery + redis的单机异步并行
说明
延续建模杂谈系列38- 基于celery、rabbitmq、redis和asyncio的分布并行处理(概述)的内容,我们假设任务已经优化致单核满载,现在使用异步并行来加快这个过程。这同步异步的让我想起了ATM和SDH,好多年前的事了,只能说计算机行业和通信行业渊源太深了。
目的:使用异步并行加快DataFrame的列变换工作。
因为redis是基于内存的数据库,速度会比较快,在单机上作为broker应该会非常快。所以单机情况下,我们使用celery+redis。关于redis可以参考Redis是什么?看这一篇就够了
Redis是现在最受欢迎的NoSQL数据库之一,Redis是一个使用ANSI C编写的开源、包含多种数据结构、支持网络、基于内存、可选持久性的键值对存储数据库
1 内容
本篇先基于异步任务神器 Celery 快速入门教程进行实验,但是这篇文章是16年的(太老了),所以之后会找更新的版本进行更新。
1.1 结构
构建的结构如下:
celery_demo
├── celery_app
│ ├── __init__.py
│ ├── celeryconfig.py
│ ├── task1.py
│ └── task2.py
└── client.py
在项目文件下有一个client.py
, 里面模拟的是用户调用函数的情况(使用者)。
from celery_app import task1
from celery_app import task2
import time
start = time.time() * 1000
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
print('hello world')
end = time.time() * 1000
print('takes %.2f ms' %(end-start))
在celeryconfig.py
中定义了配置。
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task1',
'celery_app.task2'
)
在task1.py, task2.py
中定义了具体需要计算的函数
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
---
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
在__init__.py
中定义了基本的app(这个和flask的基本配置非常相似)Python 全栈系列38 - 搭建网站模板
from celery import Celery
# 和flask的结构很像
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
在项目根目录下执行命令
celery -A celery_app worker --loglevel=info
2.2 实验
可以看到,执行的速度很快(因为程序并没有管是否计算完,发布完计算任务就闪了)
YOURPATH/celery_demo python3 client.py
hello world
takes 288.03 ms
YOURPATHs/celery_demo python3 client.py
hello world
takes 233.41 ms
YOURPATH/celery_demo python3 client.py
hello world
takes 231.70 ms
YOURPATH/celery_demo
我打开了两个celery服务,可以看到两个服务是各自都有计算的(并行)
第一个:
第二个:
3 进一步实验
在保持celery服务不中断的情况下,我更新task的种类,完成开头提到的任务(并行执行DataFrame的列变换)
单机的程序single_proc.py
如下,算是一类常见的数据处理场景吧。我们假设apply方法已经使cpu单核合理满载了。(这里有个额外的开销是pickle,因为要通过队列持久化不能是对象)
import pandas as pd
import numpy as np
import time
df = pd.DataFrame(np.random.randn(1000000,5), columns=list('ABCDE'))
start = time.time() * 1000
for col in df.columns:
df[col].apply(lambda x:x**2).apply(lambda x: x+ 1000).apply(np.log)
end = time.time() * 1000
print('takes %.2f ms' % (end-start))
---
takes 3035.24 ms
直接添加task3, 把变换函数写进去。(注意,此时celery服务还没停,我们来看看是否会向flask自动热加载)
import time
from celery_app import app
import numpy as np
@app.task
def col_transform(x):
s1 = x
print(s1.apply(lambda x: x**2).apply(lambda x: x + 1000).apply(np.log).head())
修改celeryconfig.py
在执行后续计算的时候发现数据的序列化问题,需要在原文的配置上增加内容。注意pickle是有可能被作为一个漏洞进行恶意攻击的,所以在使用celery时需要加一些安全的设置。例如只允许通过本机的服务提交任务。
...
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task1',
'celery_app.task2',
'celery_app.task3'
)
# celery允许接收的数据格式,可以是一个字符串,比如'json'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
# 异步任务的序列化器,也可以是json
CELERY_TASK_SERIALIZER = 'pickle'
# 任务结果的数据格式,也可以是json
CELERY_RESULT_SERIALIZER = 'pickle'
编写mul_proc.py
from celery_app import task1
from celery_app import task2
from celery_app import task3
import time
import pandas as pd
import numpy as np
df = pd.DataFrame(np.random.randn(1000000, 5), columns=list('ABCDE'))
start = time.time() * 1000
for col in df.columns:
task3.col_transform(df[col])
end = time.time() * 1000
print('takes %.2f ms' % (end-start))
执行:
python3 mul_proc.py
---
0 6.908283
1 6.908282
2 6.908983
3 6.910506
4 6.909736
Name: A, dtype: float64
0 6.907761
1 6.908139
2 6.909240
3 6.909176
4 6.909002
Name: B, dtype: float64
0 6.911355
1 6.908363
2 6.907783
3 6.907757
4 6.909631
Name: C, dtype: float64
0 6.911155
1 6.907763
2 6.907756
3 6.907869
4 6.908886
Name: D, dtype: float64
0 6.908937
1 6.907795
2 6.907842
3 6.907821
4 6.907848
Name: E, dtype: float64
takes 3147.68 ms
和之前的时间相差无几(稍微长一点,因为有序列化过程),因为提交的时候没有用delay方法,再来一次
from celery_app import task1
from celery_app import task2
from celery_app import task3
import time
import pandas as pd
import numpy as np
df = pd.DataFrame(np.random.randn(1000000, 5), columns=list('ABCDE'))
start = time.time() * 1000
for col in df.columns:
task3.col_transform.delay(df[col])
end = time.time() * 1000
print('takes %.2f ms' % (end-start))
---
takes 1636.87 ms
总的计算时间几乎减半(但多了一点开销),还是很合理的。可以在服务器上看到日志(每列的执行时间约700ms)
再开一个celery。在三个celery之下似乎没有提升(我的mac是4核的)。我把这个迁移到台式机上(8核)测试。
应该不用再测了,之前因为计算的时间太短了,几乎都是在开销上(数据传输)。我把每个迭代需要计算的时间增加一秒后,单进程跑100s。多进程大约20秒。(理论应该是33秒才对)
def col_transform(x):
s1 = x
time.sleep(1)
print(s1.apply(lambda x: x**2).apply(lambda x: x + 1000).apply(np.log).head())
另外,如果纯粹只是单机多进程的话,那么multiprocess之类的应该会更好(因为没有通过broker进行序列和反序列化)。不过未来更多考虑的还是分布式计算(一个机器再强也有限)。
4 其他
delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数
celery在执行任务(apply_async)时还允许
- 1 countdown:指定多少秒后执行任务
- 2 eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
# 1 countdown
task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务
# 2 eta
from datetime import datetime, timedelta
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
通过 Celery Beat还可以执行周期性任务
‘’‘在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。’’’
因为这篇文章太老,之后会在最新的文档上进行更好的构建。(有些方法被depreciated了)