欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

grpc练习

程序员文章站 2024-03-14 14:22:04
...

先写 .gropo文件,传输数据

python -m grpc_tools.protoc -I.  --python_out=. --grpc_python_out=.reco.proto  
  • -I 需要导入的proto文件从哪个目录中寻找 (-I. 表示从当前目录中查找)
    ---python_out proto文件中定义的message 字段生成的python代码文件保存到哪个目录
  • --grpc_python_out proto文件中定义的service 字段生成的python代码文件保存到哪个目录

在这个文件夹右击 updown把那两个文件下载下来

写server.py文件

  • 带_grpc的文件能找到类名
  • 方法名就是双方定义的方法名
  • 不带_grpc的文件 . (带_grpc的文件下面)xxxResponse来创建传输数据的对象

写client.py文件

运行server文件

python server.py

运行client文件

python client.py

以下是实例代码

# reco.proto

syntax = "proto3";

message UserRequest {
    string user_id=1;
    int32 channel_id=2;
    int32 article_num=3;
    int64 time_stamp=4;
}

message Track {
    string click=1;
    string collect=2;
    string share=3;
    string read=4;
}

message Article {
    int64 article_id=1;
    Track track=2;
}

message ArticleResponse {
    string exposure=1;
    int64 time_stamp=2;
    repeated Article recommends=3;
}

service UserRecommend {
    rpc user_recommend(UserRequest) returns(ArticleResponse) {}
}
# reco_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc

import reco_pb2 as reco__pb2


class UserRecommendStub(object):
  # missing associated documentation comment in .proto file
  pass

  def __init__(self, channel):
    """Constructor.

    Args:
      channel: A grpc.Channel.
    """
    self.user_recommend = channel.unary_unary(
        '/UserRecommend/user_recommend',
        request_serializer=reco__pb2.UserRequest.SerializeToString,
        response_deserializer=reco__pb2.ArticleResponse.FromString,
        )


class UserRecommendServicer(object):
  # missing associated documentation comment in .proto file
  pass

  def user_recommend(self, request, context):
    # missing associated documentation comment in .proto file
    pass
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    context.set_details('Method not implemented!')
    raise NotImplementedError('Method not implemented!')


def add_UserRecommendServicer_to_server(servicer, server):
  rpc_method_handlers = {
      'user_recommend': grpc.unary_unary_rpc_method_handler(
          servicer.user_recommend,
          request_deserializer=reco__pb2.UserRequest.FromString,
          response_serializer=reco__pb2.ArticleResponse.SerializeToString,
      ),
  }
  generic_handler = grpc.method_handlers_generic_handler(
      'UserRecommend', rpc_method_handlers)
  server.add_generic_rpc_handlers((generic_handler,))

# client.py

import grpc
import reco_pb2_grpc
import reco_pb2
import time


def run():
    # 连接rpc服务器得到与rpc连接的对象
    with grpc.insecure_channel('127.0.0.1:8888') as conn:
        # 创建进行rpc调用的工具对象
        stub = reco_pb2_grpc.UserRecommendStub(conn)

        # 通过工具对象进行rpc函数调用
        req = reco_pb2.UserRequest()
        req.user_id = '1'
        req.channel_id = 12
        req.article_num = 10
        req.time_stamp = round(time.time() * 1000)

        ret = stub.user_recommend(req)
        # ret -> ArticleResponse 对象
        print(ret)


if __name__ == '__main__':
    run()

# server.py

import grpc
from concurrent.futures import ThreadPoolExecutor
import time
import reco_pb2_grpc
import reco_pb2


# 补全被调用的代码
class UserRecommendServicer(reco_pb2_grpc.UserRecommendServicer):
    # 在接口定义的同名方法中补全,被调用时应该执行的逻辑
    def user_recommend(self, request, context):
        # request是调用的请求数据对象
        user_id = request.user_id
        channel_id = request.channel_id
        article_num = request.article_num
        time_stamp = request.time_stamp

        response = reco_pb2.ArticleResponse()
        response.exposure = 'exposure param'
        response.time_stamp = round(time.time() * 1000)
        recommends = []
        for i in range(article_num):
            article = reco_pb2.Article()
            article.track.click = 'click param {}'.format(i + 1)
            article.track.collect = 'collect param {}'.format(i + 1)
            article.track.share = 'share param {}'.format(i + 1)
            article.track.read = 'read param {}'.format(i + 1)
            article.article_id = i + 1
            recommends.append(article)
        response.recommends.extend(recommends)

        # 最终要返回一个调用结果
        return response


# 编写rpc服务器运行代码
def serve():
    # 创建rpc服务器对象
    server = grpc.server(ThreadPoolExecutor(max_workers=20))

    # 为rpc服务器 注册能够对外提供的代码
    reco_pb2_grpc.add_UserRecommendServicer_to_server(UserRecommendServicer(), server)

    # 绑定rpc IP地址和端口号
    server.add_insecure_port('127.0.0.1:8888')

    # 启动服务器
    server.start()  # 非阻塞

    # 为了防止程序因为非阻塞退出,手动阻塞
    while True:
        time.sleep(10)


if __name__ == '__main__':
    serve()
相关标签: grpc