欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

.NET Core + gRPC 实现数据串流 (Streaming)

程序员文章站 2022-05-19 10:45:15
引入 gRPC 是谷歌推出的一个高性能优秀的 RPC 框架,基于 HTTP/2 实现。并且该框架对 .NET Core 有着优秀的支持。最近在做一个项目正好用到了 gRPC,遇到了需要串流传输的问题。 引入 项目创建 首先还是需要安装 .net core sdk,可以去 http://dot.net ......

引入

grpc 是谷歌推出的一个高性能优秀的 rpc 框架,基于 http/2 实现。并且该框架对 .net core 有着优秀的支持。
最近在做一个项目正好用到了 grpc,遇到了需要串流传输的问题。

项目创建

首先还是需要安装 .net core sdk,可以去 http://dot.net 下载。这里我使用的是 2.2.103 版本的 sdk。
mkdir rpcstreaming
cd rpcstreaming
dotnet new console
dotnet add package grpc // 添加 grpc 包
dotnet add package grpc.tools // 添加 grpc 工具包
dotnet add package google.protobuf // 添加 protobuf 支持

然后为了支持 protobuf 语言,我们需要修改项目配置文件,在项目中引入 .proto 文件以便生成对应的代码。

在 rpcstreaming.csproj 中,加入<protobuf include="**/*.proto" />,除此之外还需要启用最新语言支持(c# 7.3),方便我们将 main 函数直接写为 async 函数,直接设置为最新版本的语言即可,如下所示:
<project sdk="microsoft.net.sdk">
  ...
  <propertygroup>
    ...
    <langversion>latest</langversion>
    ...
  </propertygroup>
  
  <itemgroup>
    ...
    <protobuf include="**/*.proto" />
    ...
  </itemgroup>
  ...
</project>

这里我们使用了 wildcard 语法匹配了项目内的全部 proto 文件用于生成对应的代码。

到这里,项目的创建就完成了。

编写 proto 文件

我们在项目目录下建立一个 .proto 文件,用于描述 rpc 调用和消息类型。比如:rpcstreaming.proto
内容如下:
 1 synatx = "proto3";
 2 service rpcstreamingservice {
 3   rpc getstreamcontent (streamrequest) returns (stream streamcontent) {}
 4 }
 5 message streamrequest {
 6   string filename = 1;
 7 }
 8 message streamcontent {
 9   bytes content = 1;
10 }

做 rpc 请求时,我们向 rpc 服务器发送一个 streamrequest 的 message,其中包含了文件路径;为了让服务器以流式传输数据,我们在 returns 内加一个 “stream”。

保存后,我们执行一次 dotnet build,这样就会在 ./obj/debug/netcoreapp2.2下自动生成 rpc 调用和消息类型的代码。

编写 server 端代码

为了编写 rpc 调用服务端代码,我们需要重写自动生成的 c# 虚函数。
首先我们进入 ./obj/debug/netcoreapp2.2 看看自动生成了什么代码。
rpcstreaming.cs 中包含消息类型的定义,rpcstreaminggrpc.cs 中包含了对应 rpc 调用的函数原型。
我们查找一下我们刚刚在 proto 文件中声明的 getstreamcontent。
可以在里面找到一个上方文档注释为 “base class for server-side implementations rpcstreamingservicebase” 的抽象类 rpcstreamingservicebase,里面包含了我们要找的东西。
可以找到我们的 getstreamcontent 的默认实现:
public virtual global::system.threading.tasks.task getstreamcontent(global::streamrequest request, grpc::iserverstreamwriter<global::streamcontent> responsestream, grpc::servercallcontext context)
{
    throw new grpc::rpcexception(new grpc::status(grpc::statuscode.unimplemented, ""));
}

这样就简单了,我们新建一个类 rpcserviceimpl,继承 rpcstreamingservice.rpcstreamingservicebase,然后实现对应的方法即可。

为了串流,我们需要将数据流不断写入 response,这里给一个简单的示例。
 1 using system;
 2 using system.io;
 3 using system.threading.tasks;
 4 using google.protobuf;
 5 using grpc.core;
 6 namespace rpcstreaming
 7 {
 8     public class rpcstreamingserviceimpl : rpcstreamingservice.rpcstreamingservicebase
 9     {
10         public override task getstreamcontent(streamrequest request, iserverstreamwriter<streamcontent> response, servercallcontext context)
11         {
12             return task.run(async () =>
13             {
14                 using (var fs = file.open(request.filename, filemode.open)) // 从 request 中读取文件名并打开文件流
15           {
16                     var remaininglength = fs.length; // 剩余长度
17               var buff = new byte[1048576]; // 缓冲区,这里我们设置为 1 mb
18               while (remaininglength > 0) // 若未读完则继续读取
19               {
20                         var len = await fs.readasync(buff); // 异步从文件中读取数据到缓冲区中
21                   remaininglength -= len; // 剩余长度减去刚才实际读取的长度
22 
23                   // 向流中写入我们刚刚读取的数据
24                   await response.writeasync(new streamcontent
25                         {
26                             content = bytestring.copyfrom(buff, 0, len)
27                         });
28                     }
29                 }
30             });
31         }
32     }
33 }

 

启动 rpc server

首先需要:
1 using google.protobuf;
2 using grpc.core;

然后我们在 main 函数中构建并启动 rpc server,监听 localhost:23333

1 new server
2 {
3     services = { rpcstreamingservice.bindservice(new rpcstreamingserviceimpl()) }, // 绑定我们的实现
4     ports = { new serverport("localhost", 23333, servercredentials.insecure) }
5 }.start();
6 console.readkey();

这样服务端就构建完成了。

编写客户端调用 rpc api

方便起见,我们先将 main 函数改写为 async 函数。
1 // 原来的 main 函数
2 static void main(string[] args) { ... }
3 // 改写后的 main 函数
4 static async task main(string[] args) { ... }

另外,还需要:

1 using system;
2 using system.io;
3 using system.threading.tasks;
4 using google.protobuf;
5 using grpc.core;

然后我们在 main 函数中添加调用代码:

 1 var channel = new channel("localhost:23333", channelcredentials.insecure); // 建立到 localhost:23333 的 channel
 2 var client = new rpcstreamingservice.rpcstreamingserviceclient(channel); // 建立 client
 3 // 调用 rpc api
 4 var result = client.getstreamcontent(new streamrequest { filename = "你想获取的文件路径" });
 5 var iter = result.responsestream; // 拿到响应流
 6 using (var fs = new filestream("写获取的数据的文件路径", filemode.create)) // 新建一个文件流用于存放我们获取到数据
 7 {
 8     while (await iter.movenext()) // 迭代
 9     {
10         iter.current.content.writeto(fs); // 将数据写入到文件流中
11     }
12 }

测试

dotnet run

会发现,我们想要获取的文件的数据被不断地写到我们指定的文件中,每次 1 mb。在我的电脑上测试,内网的环境下传输速度大概 80~90 mb/s,几乎跑满了我的千兆网卡,速度非常理想。