欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

建模杂谈系列39- celery + redis的单机异步并行

程序员文章站 2024-03-08 21:44:52
...

说明

延续建模杂谈系列38- 基于celery、rabbitmq、redis和asyncio的分布并行处理(概述)的内容,我们假设任务已经优化致单核满载,现在使用异步并行来加快这个过程。这同步异步的让我想起了ATM和SDH,好多年前的事了,只能说计算机行业和通信行业渊源太深了。

目的:使用异步并行加快DataFrame的列变换工作。

因为redis是基于内存的数据库,速度会比较快,在单机上作为broker应该会非常快。所以单机情况下,我们使用celery+redis。关于redis可以参考Redis是什么?看这一篇就够了

Redis是现在最受欢迎的NoSQL数据库之一,Redis是一个使用ANSI C编写的开源、包含多种数据结构、支持网络、基于内存、可选持久性的键值对存储数据库

1 内容

本篇先基于异步任务神器 Celery 快速入门教程进行实验,但是这篇文章是16年的(太老了),所以之后会找更新的版本进行更新。

1.1 结构

构建的结构如下:

celery_demo
├── celery_app
│   ├── __init__.py
│   ├── celeryconfig.py
│   ├── task1.py
│   └── task2.py
└── client.py

在项目文件下有一个client.py, 里面模拟的是用户调用函数的情况(使用者)。

from celery_app import task1
from celery_app import task2

import time 
start = time.time() * 1000

task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)
task1.add.delay(2, 8)
task2.multiply.delay(3, 7)

print('hello world')
end = time.time() * 1000
print('takes %.2f ms' %(end-start))

celeryconfig.py中定义了配置。

BROKER_URL = 'redis://127.0.0.1:6379'               # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 指定 Backend

CELERY_TIMEZONE = 'Asia/Shanghai'                     # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'

CELERY_IMPORTS = (                                  # 指定导入的任务模块
    'celery_app.task1',
    'celery_app.task2'
)

task1.py, task2.py中定义了具体需要计算的函数

import time
from celery_app import app


@app.task
def add(x, y):
    time.sleep(2)
    return x + y

---

import time
from celery_app import app


@app.task
def multiply(x, y):
    time.sleep(2)
    return x * y

__init__.py中定义了基本的app(这个和flask的基本配置非常相似)Python 全栈系列38 - 搭建网站模板

from celery import Celery

# 和flask的结构很像
app = Celery('demo')                                # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig')   # 通过 Celery 实例加载配置模块

在项目根目录下执行命令

celery -A celery_app worker --loglevel=info

建模杂谈系列39- celery + redis的单机异步并行

2.2 实验

可以看到,执行的速度很快(因为程序并没有管是否计算完,发布完计算任务就闪了)

YOURPATH/celery_demo  python3 client.py
hello world
takes 288.03 ms
YOURPATHs/celery_demo  python3 client.py
hello world
takes 233.41 ms
YOURPATH/celery_demo  python3 client.py
hello world
takes 231.70 ms
YOURPATH/celery_demo 

我打开了两个celery服务,可以看到两个服务是各自都有计算的(并行)
第一个:
建模杂谈系列39- celery + redis的单机异步并行
第二个:
建模杂谈系列39- celery + redis的单机异步并行

3 进一步实验

在保持celery服务不中断的情况下,我更新task的种类,完成开头提到的任务(并行执行DataFrame的列变换)

单机的程序single_proc.py如下,算是一类常见的数据处理场景吧。我们假设apply方法已经使cpu单核合理满载了。(这里有个额外的开销是pickle,因为要通过队列持久化不能是对象)

import pandas as pd 
import numpy as np 
import time

df = pd.DataFrame(np.random.randn(1000000,5), columns=list('ABCDE'))

start = time.time() * 1000
for col in df.columns:
    df[col].apply(lambda x:x**2).apply(lambda x: x+ 1000).apply(np.log)


end = time.time() * 1000
print('takes %.2f ms' % (end-start))

---

takes 3035.24 ms

直接添加task3, 把变换函数写进去。(注意,此时celery服务还没停,我们来看看是否会向flask自动热加载)

import time
from celery_app import app
import numpy as np 

@app.task
def col_transform(x):
    s1 = x
    print(s1.apply(lambda x: x**2).apply(lambda x: x + 1000).apply(np.log).head())

修改celeryconfig.py在执行后续计算的时候发现数据的序列化问题,需要在原文的配置上增加内容。注意pickle是有可能被作为一个漏洞进行恶意攻击的,所以在使用celery时需要加一些安全的设置。例如只允许通过本机的服务提交任务。

...
CELERY_IMPORTS = (                                  # 指定导入的任务模块
    'celery_app.task1',
    'celery_app.task2',
    'celery_app.task3'
)

# celery允许接收的数据格式,可以是一个字符串,比如'json'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
# 异步任务的序列化器,也可以是json
CELERY_TASK_SERIALIZER = 'pickle'
# 任务结果的数据格式,也可以是json
CELERY_RESULT_SERIALIZER = 'pickle'


编写mul_proc.py

from celery_app import task1
from celery_app import task2
from celery_app import task3
import time
import pandas as pd 
import numpy as np 


df = pd.DataFrame(np.random.randn(1000000, 5), columns=list('ABCDE'))

start = time.time() * 1000
for col in df.columns:
    task3.col_transform(df[col])

end = time.time() * 1000
print('takes %.2f ms' % (end-start))

执行:

python3 mul_proc.py
---
0    6.908283
1    6.908282
2    6.908983
3    6.910506
4    6.909736
Name: A, dtype: float64
0    6.907761
1    6.908139
2    6.909240
3    6.909176
4    6.909002
Name: B, dtype: float64
0    6.911355
1    6.908363
2    6.907783
3    6.907757
4    6.909631
Name: C, dtype: float64
0    6.911155
1    6.907763
2    6.907756
3    6.907869
4    6.908886
Name: D, dtype: float64
0    6.908937
1    6.907795
2    6.907842
3    6.907821
4    6.907848
Name: E, dtype: float64
takes 3147.68 ms

和之前的时间相差无几(稍微长一点,因为有序列化过程),因为提交的时候没有用delay方法,再来一次

from celery_app import task1
from celery_app import task2
from celery_app import task3
import time
import pandas as pd 
import numpy as np 


df = pd.DataFrame(np.random.randn(1000000, 5), columns=list('ABCDE'))

start = time.time() * 1000
for col in df.columns:
    task3.col_transform.delay(df[col])

end = time.time() * 1000
print('takes %.2f ms' % (end-start))
---
takes 1636.87 ms

总的计算时间几乎减半(但多了一点开销),还是很合理的。可以在服务器上看到日志(每列的执行时间约700ms)
建模杂谈系列39- celery + redis的单机异步并行

再开一个celery。在三个celery之下似乎没有提升(我的mac是4核的)。我把这个迁移到台式机上(8核)测试。
应该不用再测了,之前因为计算的时间太短了,几乎都是在开销上(数据传输)。我把每个迭代需要计算的时间增加一秒后,单进程跑100s。多进程大约20秒。(理论应该是33秒才对)

def col_transform(x):
    s1 = x
    time.sleep(1)
    print(s1.apply(lambda x: x**2).apply(lambda x: x + 1000).apply(np.log).head())

另外,如果纯粹只是单机多进程的话,那么multiprocess之类的应该会更好(因为没有通过broker进行序列和反序列化)。不过未来更多考虑的还是分布式计算(一个机器再强也有限)。

4 其他

delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数

celery在执行任务(apply_async)时还允许

  • 1 countdown:指定多少秒后执行任务
  • 2 eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
# 1 countdown
task1.apply_async(args=(2, 3), countdown=5)    # 5 秒后执行任务
# 2 eta
from datetime import datetime, timedelta
# 当前 UTC 时间再加 10 秒后执行任务
task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))

通过 Celery Beat还可以执行周期性任务

‘’‘在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。’’’

因为这篇文章太老,之后会在最新的文档上进行更好的构建。(有些方法被depreciated了)

相关标签: 建模