grpc练习
程序员文章站
2024-03-14 14:22:04
...
先写 .gropo文件,传输数据
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=.reco.proto
-
-I
需要导入的proto文件从哪个目录中寻找 (-I.
表示从当前目录中查找)
---python_out
proto文件中定义的message 字段生成的python代码文件保存到哪个目录 -
--grpc_python_out
proto文件中定义的service 字段生成的python代码文件保存到哪个目录
在这个文件夹右击 updown把那两个文件下载下来
写server.py文件
- 带_grpc的文件能找到类名
- 方法名就是双方定义的方法名
- 不带_grpc的文件 . (带_grpc的文件下面)xxxResponse来创建传输数据的对象
写client.py文件
运行server文件
python server.py
运行client文件
python client.py
以下是实例代码
# reco.proto
syntax = "proto3";
message UserRequest {
string user_id=1;
int32 channel_id=2;
int32 article_num=3;
int64 time_stamp=4;
}
message Track {
string click=1;
string collect=2;
string share=3;
string read=4;
}
message Article {
int64 article_id=1;
Track track=2;
}
message ArticleResponse {
string exposure=1;
int64 time_stamp=2;
repeated Article recommends=3;
}
service UserRecommend {
rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}
# reco_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import reco_pb2 as reco__pb2
class UserRecommendStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.user_recommend = channel.unary_unary(
'/UserRecommend/user_recommend',
request_serializer=reco__pb2.UserRequest.SerializeToString,
response_deserializer=reco__pb2.ArticleResponse.FromString,
)
class UserRecommendServicer(object):
# missing associated documentation comment in .proto file
pass
def user_recommend(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_UserRecommendServicer_to_server(servicer, server):
rpc_method_handlers = {
'user_recommend': grpc.unary_unary_rpc_method_handler(
servicer.user_recommend,
request_deserializer=reco__pb2.UserRequest.FromString,
response_serializer=reco__pb2.ArticleResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'UserRecommend', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# client.py
import grpc
import reco_pb2_grpc
import reco_pb2
import time
def run():
# 连接rpc服务器得到与rpc连接的对象
with grpc.insecure_channel('127.0.0.1:8888') as conn:
# 创建进行rpc调用的工具对象
stub = reco_pb2_grpc.UserRecommendStub(conn)
# 通过工具对象进行rpc函数调用
req = reco_pb2.UserRequest()
req.user_id = '1'
req.channel_id = 12
req.article_num = 10
req.time_stamp = round(time.time() * 1000)
ret = stub.user_recommend(req)
# ret -> ArticleResponse 对象
print(ret)
if __name__ == '__main__':
run()
# server.py
import grpc
from concurrent.futures import ThreadPoolExecutor
import time
import reco_pb2_grpc
import reco_pb2
# 补全被调用的代码
class UserRecommendServicer(reco_pb2_grpc.UserRecommendServicer):
# 在接口定义的同名方法中补全,被调用时应该执行的逻辑
def user_recommend(self, request, context):
# request是调用的请求数据对象
user_id = request.user_id
channel_id = request.channel_id
article_num = request.article_num
time_stamp = request.time_stamp
response = reco_pb2.ArticleResponse()
response.exposure = 'exposure param'
response.time_stamp = round(time.time() * 1000)
recommends = []
for i in range(article_num):
article = reco_pb2.Article()
article.track.click = 'click param {}'.format(i + 1)
article.track.collect = 'collect param {}'.format(i + 1)
article.track.share = 'share param {}'.format(i + 1)
article.track.read = 'read param {}'.format(i + 1)
article.article_id = i + 1
recommends.append(article)
response.recommends.extend(recommends)
# 最终要返回一个调用结果
return response
# 编写rpc服务器运行代码
def serve():
# 创建rpc服务器对象
server = grpc.server(ThreadPoolExecutor(max_workers=20))
# 为rpc服务器 注册能够对外提供的代码
reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendServicer(), server)
# 绑定rpc IP地址和端口号
server.add_insecure_port('127.0.0.1:8888')
# 启动服务器
server.start() # 非阻塞
# 为了防止程序因为非阻塞退出,手动阻塞
while True:
time.sleep(10)
if __name__ == '__main__':
serve()