pyTorch模型部署--高并发web服务 c++ 移动端ncnn
1 综述
基于pyTorch的python接口训练的模型,一般会针对部署环境进行模型的转换。而深度学习模型的使用环境,一般会分成以python web服务的方式进行部署、以c++调用深度学习模型或针对移动式设备进行部署。
2 以python web服务的形式进行部署
在https://blog.csdn.net/cdknight_happy/article/details/100015592中,用docker + supervisor + nginx + gunicorn + flask的方式部署了深度学习的服务,如下面的代码所示:
#服务端代码
import numpy as np
import sys
import os
import torch
from flask import Flask,request,jsonify
import json
import torchvision
app = Flask(__name__)
model = torchvision.models.resnet18(pretrained=True)
model.eval()
def run_inference(in_tensor):
with torch.no_grad():
out_tensor = model(in_tensor.permute(2,0,1).unsqueeze(0))
prob,clas = torch.softmax(out_tensor,dim=1).max(dim=1)
out = {'label':clas.item()}
return out
@app.route('/predict',methods = ['POST'])
def predict():
meta = json.load(request.files['meta'])
blob = request.files['blob'].read()
in_tensor = torch.from_numpy(np.frombuffer(blob,dtype = np.int8))
in_tensor = in_tensor.view(*meta['shape'])
in_tensor = in_tensor.to(torch.float32) / 255.
out = run_inference(in_tensor)
return jsonify(out)
if __name__ == '__main__':
app.run(host='0.0.0.0',port=8000)
#客户端代码
import requests
import json
import io
import cv2
img = cv2.imread('./1.jpg')
meta = io.StringIO(json.dumps({'shape': list(img.shape)}))
data = io.BytesIO(bytearray(img))
r = requests.post('http://localhost:8000/predict',
files={'meta': meta, 'blob' : data})
response = json.loads(r.content)
print("class label is:", response['label'])
上述代码只是一个示例,并未使用wsgi进行生产环境的部署。但其仍然是使用flask进行web响应,代码存在下述四个方面的问题:
- 对于单个客户端发送的请求集合,只能顺序处理,只能在一个请求处理完成后才能处理另一个请求,效率低下;
- 因为GIL的存在,也无法在服务端开启多个线程异步响应请求;
- 数据拷贝过多,上述代码中,服务端接收到数据后,将bytes转换成numpy,然后再转换为torch.tensor,再进行数据的reshape操作。
深度学习模型批处理才可以最大限度的发挥其能力,因此可以使用sanic框架替代flask以提升处理效率。
下面的内容参考自《Deep Learning With PyTorch》第15章。
整体实现思路如下图所示:
client端发送多个请求,在server端先进行组batch,在组到指定数量的请求时或者第一个收到的请求已经等待了足够长的时间时,将请求集送入深度学习模型进行批处理,将处理结果顺序的返回给client端。代码实现中用到了协程实现异步操作。
#服务端代码
import sys
import asyncio
import itertools
import functools
from sanic import Sanic
from sanic.response import json, text
from sanic.log import logger
from sanic.exceptions import ServerError
import sanic
import threading
import PIL.Image
import io
import torch
import torchvision
# from cyclegan import get_pretrained_model
app = Sanic(__name__)
device = torch.device('cpu')
# we only run 1 inference run at any time (one could schedule between several runners if desired)
MAX_QUEUE_SIZE = 3 # we accept a backlog of MAX_QUEUE_SIZE before handing out "Too busy" errors
MAX_BATCH_SIZE = 2 # we put at most MAX_BATCH_SIZE things in a single batch
MAX_WAIT = 1 # we wait at most MAX_WAIT seconds before running for more inputs to arrive in batching
class HandlingError(Exception):
def __init__(self, msg, code=500):
super().__init__()
self.handling_code = code
self.handling_msg = msg
class ModelRunner:
def __init__(self):#, model_name
# self.model_name = model_name
self.queue = []
self.queue_lock = None
# self.model = get_pretrained_model(self.model_name,
# map_location=device)
self.model = torchvision.models.resnet18(pretrained=True)
self.model.eval()
for p in self.model.parameters():
p.requires_grad_(False)
self.needs_processing = None
self.needs_processing_timer = None
def schedule_processing_if_needed(self):
if len(self.queue) >= MAX_BATCH_SIZE:
logger.debug("next batch ready when processing a batch")
self.needs_processing.set()
elif self.queue:
logger.debug("queue nonempty when processing a batch, setting next timer")
self.needs_processing_timer = app.loop.call_at(self.queue[0]["time"] + MAX_WAIT, self.needs_processing.set)
async def process_input(self, input):
our_task = {"done_event": asyncio.Event(loop=app.loop),
"input": input,
"time": app.loop.time()}
async with self.queue_lock:
if len(self.queue) >= MAX_QUEUE_SIZE:
raise HandlingError("I'm too busy", code=503)
self.queue.append(our_task)
logger.debug("enqueued task. new queue size {}".format(len(self.queue)))
self.schedule_processing_if_needed()
await our_task["done_event"].wait()
return our_task["output"]
def run_model(self, batch): # runs in other thread
return self.model(batch.to(device)).to('cpu')
async def model_runner(self):
self.queue_lock = asyncio.Lock(loop=app.loop)
self.needs_processing = asyncio.Event(loop=app.loop)
# logger.info("started model runner for {}".format(self.model_name))
while True:
await self.needs_processing.wait()
self.needs_processing.clear()
if self.needs_processing_timer is not None:
self.needs_processing_timer.cancel()
self.needs_processing_timer = None
async with self.queue_lock:
if self.queue:
longest_wait = app.loop.time() - self.queue[0]["time"]
else: # oops
longest_wait = None
logger.debug("launching processing. queue size: {}. longest wait: {}".format(len(self.queue), longest_wait))
to_process = self.queue[:MAX_BATCH_SIZE]
del self.queue[:len(to_process)]
self.schedule_processing_if_needed()
# so here we copy, it would be neater to avoid this
batch = torch.stack([t["input"] for t in to_process], dim=0)
# we could delete inputs here...
result = await app.loop.run_in_executor(
None, functools.partial(self.run_model, batch)
)
for t, r in zip(to_process, result):
t["output"] = r
t["done_event"].set()
del to_process
style_transfer_runner = ModelRunner()#sys.argv[1]
@app.route('/image', methods=['POST'], stream=True)
async def image(request):
try:
print (request.headers)
content_length = int(request.headers.get('content-length', '0'))
MAX_SIZE = 2**22 # 10MB
if content_length:
if content_length > MAX_SIZE:
raise HandlingError("Too large")
data = bytearray(content_length)
else:
data = bytearray(MAX_SIZE)
pos = 0
while True:
# so this still copies too much stuff.
data_part = await request.stream.read()
if data_part is None:
break
data[pos: len(data_part) + pos] = data_part
pos += len(data_part)
if pos > MAX_SIZE:
raise HandlingError("Too large")
# ideally, we would minimize preprocessing...
im = PIL.Image.open(io.BytesIO(data))
im = torchvision.transforms.functional.resize(im, (228, 228))
im = torchvision.transforms.functional.to_tensor(im)
im = im[:3] # drop alpha channel if present
if im.dim() != 3 or im.size(0) < 3 or im.size(0) > 4:
raise HandlingError("need rgb image")
out_im = await style_transfer_runner.process_input(im)
out_im = torchvision.transforms.functional.to_pil_image(out_im)
imgByteArr = io.BytesIO()
out_im.save(imgByteArr, format='JPEG')
return sanic.response.raw(imgByteArr.getvalue(), status=200,
content_type='image/jpeg')
except HandlingError as e:
# we don't want these to be logged...
return sanic.response.text(e.handling_msg, status=e.handling_code)
app.add_task(style_transfer_runner.model_runner())
app.run(host="0.0.0.0", port=8000,debug=True)
#客户端代码
import requests
import json
import io
img_name = './1.jpg'
headers = {'Content-type': 'image/jpg'}
# r = requests.put('http://localhost:8000/image',data=open(img_name, "rb"),headers=headers)
r = requests.post('http://localhost:8000/image',data=open(img_name, "rb"),headers=headers)
response = json.loads(r.content)
print("class label is:", response['label'])
客户端也可以通过下面的指令提交图像:
curl -T data/p1ch2/horse.jpg http://localhost:8000/image --output /tmp/res.jpg
服务启动后会自动调用model_runner
进入等待,一旦客户端发送了请求过来,就通过process_input
函数进行了任务缓存,在任务达到指定数量后或者第一个任务等待了足够的时间后,就会取任务进行批量数据的推理。
参考:sanic官方地址 、asyncio、https://www.jianshu.com/p/636833c71c2a
3 c++调用方式进行部署
以c++方式部署模型时,需要首先将模型转换成在c++中能调用的方式。转换方式有torchscript、onnx和tensorRT。
模型转换过程中常见的错误是,某些操作在某些框架中是不支持的,需要自定义一些操作或者网络层。也有可能出现在两个不同框架下某个同名操作的实现是不完全一致的,因此模型转换过程中需要特别细致,具体问题具体分析解决。最好在模型转换前后,对同一个输入进行推理,对比推理结果是否一致以判断模型转换是否成功。
3.1 torchscript
torchscript是由pyTorch提供的用于进行模型高效率部署的环境,包含在torch.jit
模块中。torchscript提供了一些工具获取一个pyTorch模型的定义,将其动态图变为静态图,为每一个操作选取效率最高的实现,从而提升模型的推理效率。
参考https://pytorch.org/tutorials/beginner/Intro_to_TorchScript_tutorial.html、https://pytorch.org/tutorials/advanced/cpp_export.html、https://pytorch.org/docs/stable/jit.html、https://*.com/questions/62626052/what-are-the-differences-between-torch-jit-trace-and-torch-jit-script-in-torchsc。
3.1.1 模型转换
将pyTorch模组转换为torchscript模型,最关键的是torch.jit.trace
和torch.jit.script
两个函数。
3.1.1.1 torch.jit.trace
如下面的示例代码所示:
class MyCell(torch.nn.Module):
def __init__(self):
super(MyCell, self).__init__()
self.linear = torch.nn.Linear(4, 4)
def forward(self, x, h):
new_h = torch.tanh(self.linear(x) + h)
return new_h, new_h
my_cell = MyCell()
x, h = torch.rand(3, 4), torch.rand(3, 4)
traced_cell = torch.jit.trace(my_cell, (x, h))
print(traced_cell)
traced_cell(x, h)
print(traced_cell.code)
输出:
MyCell(
original_name=MyCell
(linear): Linear(original_name=Linear)
)
def forward(self,
input: Tensor,
h: Tensor) -> Tuple[Tensor, Tensor]:
_0 = torch.add((self.linear).forward(input, ), h, alpha=1)
_1 = torch.tanh(_0)
return (_1, _1)
torch.jit.trace
接受一个pyTorch模型和一个示例输入,得到一个固化后的模型。之所以要输入一个示例的输入,如下面代码中的(x,h)
,作用就是给定一个真实的输入,记录下模型在推理过程中到底要进行哪些操作,如为每一个卷积层选取效率最快的实现,舍弃模型中记录的多余的操作,从而将动态图转换为适合于当前输入的静态图,提升推理效率。traced_cell.code
中记录了模型具体执行的操作集合。
3.1.1.2 torch.jit.script
torch.jit.script
和torch.jit.trace
的作用是一致的,区别是两者适用于不同类型的模型。一句话概况就是
class MyDecisionGate(torch.nn.Module):
def forward(self, x):
if x.sum() > 0:
return x
else:
return -x
class MyCell(torch.nn.Module):
def __init__(self, dg):
super(MyCell, self).__init__()
self.dg = dg
self.linear = torch.nn.Linear(4, 4)
def forward(self, x, h):
new_h = torch.tanh(self.dg(self.linear(x)) + h)
return new_h, new_h
my_cell = MyCell(MyDecisionGate())
traced_cell = torch.jit.trace(my_cell, (x, h))
print(traced_cell.code)
输出:
def forward(self,
input: Tensor,
h: Tensor) -> Tuple[Tensor, Tensor]:
_0 = self.dg
_1 = (self.linear).forward(input, )
_2 = (_0).forward(_1, )
_3 = torch.tanh(torch.add(_1, h, alpha=1))
return (_3, _3)
输出_3 = torch.tanh(torch.add(_1, h, alpha=1))
中可以看出,add
操作是在_1
和h
上执行的,而不是在_2
和h
上执行的,也就是说,使用torch.jit.trace
时,MyDecisionGate
中的forward
过程被被忽略了,原因就在于torch.jit.trace
只记录在给定输入下模型实际执行的操作。那么MyDecisionGate
的forward
函数中包含了判断语句,这些控制语句是torch.jit.trace
无法解析的,因此在转换过程中被忽略了。
正确的做法是使用torch.jit.script
函数,模型代码保持不变,转换部分的代码变为:
scripted_gate = torch.jit.script(MyDecisionGate())
my_cell = MyCell(scripted_gate)
traced_cell = torch.jit.script(my_cell)
print(traced_cell.code)
输出:
def forward(self,
x: Tensor,
h: Tensor) -> Tuple[Tensor, Tensor]:
_0 = (self.dg).forward((self.linear).forward(x, ), )
new_h = torch.tanh(torch.add(_0, h, alpha=1))
return (new_h, new_h)
可以看到,self.dg
的前向函数参与了模型的转换过程。
对于网络前向运算过程中包含for循环
的,也可以得到相同的结论。因此,如果模型的前向运算过程中包含了if-else
判断或者for循环
等控制流,必须使用torch.jit.script
。但是如果代码中包含了torch.jit.script
不支持的多态,那么就应该使用torch.jit.trace
。
3.1.2 模型保存与加载
保存模型:
traced_cell.save('1.zip')
加载模型:
traced_cell = torch.jit.load('1.zip')
3.1.3 在c++中使用torchscript的模型(libtorch)
LibTorch为pyTorch的c++ API。从pyTorch官网下载libTorch,解压,使用头文件和库文件。
代码中明确需要使用的头文件有:#include <torch/script.h>
示例c++代码:
#include <torch/script.h> // One-stop header.
#include <iostream>
#include <memory>
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: example-app <path-to-exported-script-module>\n";
return -1;
}
torch::jit::script::Module module;
try {
// Deserialize the ScriptModule from a file using torch::jit::load().
module = torch::jit::load(argv[1]);
}
catch (const c10::Error& e) {
std::cerr << "error loading the model\n";
return -1;
}
// Create a vector of inputs.
std::vector<torch::jit::IValue> inputs;
inputs.push_back(torch::ones({1, 3, 224, 224}));
// Execute the model and turn its output into a tensor.
at::Tensor output = module.forward(inputs).toTensor();
std::cout << output.slice(/*dim=*/1, /*start=*/0, /*end=*/5) << '\n';
std::cout << "ok\n";
}
3.2 onnx
3.2.1 pyTorch模型转onnx
安装onnx、onnxoptimizer、onnx-simplifier:
pip install onnxoptimizer onnx-simplifier onnx
pyTorch1.0后的版本自带了onnx模型输出,将pyTorch模型转换为onnx模型:
try:
import onnx
# Input
img = torch.zeros((opt.batch_size, 3, *opt.img_size)) # image size(1,3,320,192) iDetection
print('\nStarting ONNX export with onnx %s...' % onnx.__version__)
f = opt.weights.replace('.pt', '.onnx') # filename
model.fuse() # only for ONNX
torch.onnx.export(model, img, f, verbose=False, opset_version=12,
input_names=['images'],output_names=['classes', 'boxes'] if y is None else ['output'])
# Checks
onnx_model = onnx.load(f) # load onnx model
onnx.checker.check_model(onnx_model) # check onnx model
print(onnx.helper.printable_graph(onnx_model.graph)) # print a human readable model
print('ONNX export success, saved as %s' % f)
except Exception as e:
print('ONNX export failure: %s' % e)
torch.onnx.export
导出模型,onnx.checker.check_model
进行模型检查;
3.2.2 使用onnx-simplifier简化模型
python3 -m onnxsim input_onnx_model output_onnx_model
3.3 tensorrt
这里有很多关于tensorrt的介绍
官方地址:https://docs.nvidia.com/deeplearning/tensorrt/index.html
常见模型转换:https://github.com/wang-xinyu/tensorrtx
4 移动端进行部署
4.1 ncnn
官方地址:https://github.com/Tencent/ncnn
ncnn和tensorRT一样,专注于部署环境中的模型推理的优化。
pyTorch模型可以先转onnx(3.2节)再转为ncnn进行部署。
ubuntu下用vscode调试ncnn代码变量设置:
#task.json
{
"tasks": [
{
"type": "shell",
"label": "C/C++: g++ build active file",
"command": "/usr/bin/g++",
"args": [
"-g",
"${workspaceFolder}/yolov5.cpp",
"-o",
"${fileDirname}/${fileBasenameNoExtension}",
"-std=c++11",
"-fopenmp",
"-I${workspaceFolder}/../src",
"-I${workspaceFolder}/../build/src",
"-I/usr/local/include/opencv4",
"-L${workspaceFolder}/../build/src",
"-lncnn",
"-L/usr/local/lib",
"-lopencv_aruco",
"-lopencv_bgsegm",
"-lopencv_bioinspired",
"-lopencv_calib3d",
"-lopencv_ccalib",
"-lopencv_core",
"-lopencv_cudaarithm",
"-lopencv_cudabgsegm",
"-lopencv_cudacodec",
"-lopencv_cudafeatures2d",
"-lopencv_cudafilters",
"-lopencv_cudaimgproc",
"-lopencv_cudalegacy",
"-lopencv_cudaobjdetect",
"-lopencv_cudaoptflow",
"-lopencv_cudastereo",
"-lopencv_cudawarping",
"-lopencv_cudev",
"-lopencv_datasets",
"-lopencv_dnn_objdetect",
"-lopencv_dnn",
"-lopencv_dpm",
"-lopencv_face",
"-lopencv_features2d",
"-lopencv_flann",
"-lopencv_freetype",
"-lopencv_fuzzy",
"-lopencv_gapi",
"-lopencv_hdf",
"-lopencv_hfs",
"-lopencv_highgui",
"-lopencv_imgcodecs",
"-lopencv_img_hash",
"-lopencv_imgproc",
"-lopencv_line_descriptor",
"-lopencv_ml",
"-lopencv_objdetect",
"-lopencv_optflow",
"-lopencv_phase_unwrapping",
"-lopencv_photo",
"-lopencv_plot",
"-lopencv_quality",
"-lopencv_reg",
"-lopencv_rgbd",
"-lopencv_saliency",
"-lopencv_shape",
"-lopencv_stereo",
"-lopencv_stitching",
"-lopencv_structured_light",
"-lopencv_superres",
"-lopencv_surface_matching",
"-lopencv_text",
"-lopencv_tracking",
"-lopencv_videoio",
"-lopencv_video",
"-lopencv_videostab",
"-lopencv_xfeatures2d",
"-lopencv_ximgproc",
"-lopencv_xobjdetect",
"-lopencv_xphoto",
"-L/usr/lib/x86_64-linux-gnu",
"-lvulkan",
"-L${workspaceFolder}/../build/glslang/glslang",
"-lglslang",
"-L${workspaceFolder}/../build/glslang/glslang/OSDependent/Unix",
"-lOSDependent",
"-L${workspaceFolder}/../build/glslang/OGLCompilersDLL",
"-lOGLCompiler",
"-L${workspaceFolder}/../build/glslang/SPIRV",
"-lSPIRV"
],
"options": {
"cwd": "${workspaceFolder}"
},
"problemMatcher": [
"$gcc"
],
"group": {
"kind": "build",
"isDefault": true
}
}
],
"version": "2.0.0"
}
关键点:"-std=c++11", "-fopenmp",-lncnn,-lvulkan,-lglslang,-lOSDependent,-lOGLCompiler,-lSPIRV
;
参考https://zhuanlan.zhihu.com/p/275989233?utm_source=qq学习如何自定义层、如何修改网络参数、如何进行模型转换。
4.2 pyTorch mobile
官方地址:https://pytorch.org/mobile/home/
本文地址:https://blog.csdn.net/cdknight_happy/article/details/110373458