使用高性能Pipelines构建.NET通讯程序
.net standard支持一组新的api,system.span
pipelines旨在解决.net编写socket通信程序时的很多,相信读者也对此不胜其烦,使用stream模型进行编程,就算能够解决,也是实在麻烦。 system.io.pipelines使用简单的内存片段来管理数据,可以极大的简化编写程序的过程。关于pipelines的详细介绍,可以看看。现在asp.net core中使用的kestrel已经在使用这个api。(话说这个东西貌似就是kestrel团队搞出来的。) 可能是直接需要用socket场景有限(物联网用的还挺多的),pipelines相关的资料感觉不是很多。官方给出的示例是基于ascii协议的,有固定结尾的协议,这里我以物联网设备常用的binary二进制自定义协议为例,讲解基于pipelines的程序套路。 与基于stream的方式不同,pipelines提供一个pipe,用于存储数据,pipe中间存储的数据有点链表的感觉,可以基于 接受数据循环:接到数据->放pipe里面->告诉pipe放了多少数据 有一款设备,binary协议,数据包开头0x75, 0xbd, 0x7e, 0x97一共4个字节,随后跟数据包长度2个字节(固定2400字节,不固定长度也可以参照),随后是数据区。在设备连接成功之后,数据主动从设备发送到pc。 虽然是.net core平台的,但是.net framework 4.6.1上面也可以nuget安装,直接 进行安装就可以了。socket相关处理的代码不再写了,只列关键的。 代码第一步是声明pipe。 pipe有reader还有一个writer,reader负责读取pipe数据,主要用在数据处理循环,writer负责将数据写入pipe,主要用在数据接受循环。 以上代码基本解决了以下问题: 本文只是解释了pipeline处理的模式,对于茫茫多的toarray方法,可以使用基于span的操作进行优化(有时间就来填坑)。另外,如果在system.io.pipelines
sequenceposition
进行slice操作,这样就能得到一个readonlysequence<t>
对象。reader可以进行自定义操作,并在操作完成之后告诉pipe已经处理了多少数据,整个过程是不需要进行内存复制操作的,因此性能得到了提升,还少了很多麻烦。可以简单理解作为服务器端,流程:
处理数据循环:在pipe里面找一条完整数据->交给处理流程->告诉pipe处理了多少数据协议
关键代码
install-package system.io.pipelines
private async void initpipe(socket socket)
{
pipe pipe = new pipe();
task writing = fillpipeasync(socket, pipe.writer);
task reading = readpipeasync(socket, pipe.reader);
await task.whenall(reading, writing);
}
//写入循环
private async task fillpipeasync(socket socket, pipewriter writer)
{
//数据流量比较大,用1m字节作为buffer
const int minimumbuffersize = 1024 * 1024;
while (running)
{
try
{
//从writer中,获得一段不少于指定大小的内存空间
memory<byte> memory = writer.getmemory(minimumbuffersize);
//将内存空间变成arraysegment,提供给socket使用
if (!memorymarshal.trygetarray((readonlymemory<byte>)memory, out arraysegment<byte> arraysegment))
{
throw new invalidoperationexception("buffer backed by array was expected");
}
//接受数据
int bytesread = await sockettaskextensions.receiveasync(socket, arraysegment, socketflags.none);
if (bytesread == 0)
{
break;
}
//一次接受完毕,数据已经在pipe中,告诉pipe已经给它写了多少数据。
writer.advance(bytesread);
}
catch
{
break;
}
// 提示reader可以进行读取数据,reader可以继续执行readasync()方法
flushresult result = await writer.flushasync();
if (result.iscompleted)
{
break;
}
}
// 告诉pipe完事了
writer.complete();
}
//读取循环
private async task readpipeasync(socket socket, pipereader reader)
{
while (running)
{
//等待writer写数据
readresult result = await reader.readasync();
//获得内存区域
readonlysequence<byte> buffer = result.buffer;
sequenceposition? position = null;
do
{
//寻找head的第一个字节所在的位置
position = buffer.positionof((byte)0x75);
if (position != null)
{
//由于是连续四个字节作为head,需要进行比对,我这里直接使用了toarray方法,还是有了内存拷贝动作,不是很理想,但是写起来很方便。
//对性能有更高要求的场景,可以进行slice操作后的单独比对,这样不需要内存拷贝动作
var headtocheck = buffer.slice(position.value, 4).toarray();
//sequenceequal需要引用system.linq
if (headtocheck.sequenceequal(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
{
//到这里,认为找到包开头了(从position.value开始),接下来需要从开头处截取整包的长度,需要先判断长度是否足够
if (buffer.slice(position.value).length >= 2400)
{
//长度足够,那么取出readonlysequence,进行操作
var mes = buffer.slice(position.value, 2400);
//这里是数据处理的函数,可以参考官方文档对readonlysequence进行操作,文档里面使用了span,那样性能会好一些。我这里简单实用toarray()操作,这样也有了内存拷贝的问题,但是处理的直接是byte数组了。
await processmessage(mes.toarray());
//这一段就算是完成了,从开头位置,一整个包的长度就算完成了
var next = buffer.getposition(2400, position.value);
//将buffer处理过的舍弃,替换为剩余的buffer引用
buffer = buffer.slice(next);
}
else
{
//长度不够,说明数据包不完整,等下一波数据进来再拼接,跳出循环。
break;
}
}
else
{
//第一个是0x75但是后面不匹配,可能有数据传输问题,那么需要舍弃第一个,0x75后面的字节开始再重新找0x75
var next = buffer.getposition(1, position.value);
buffer = buffer.slice(next);
}
}
}
while (position != null);
//数据处理完毕,告诉pipe还剩下多少数据没有处理(数据包不完整的数据,找不到head)
reader.advanceto(buffer.start, buffer.end);
if (result.iscompleted)
{
break;
}
}
reader.complete();
}
后记
await processmessage(mes.toarray());
这里,直接使用task.run(()=>processmessage(mes);
代替的话,实测会出现莫名其妙的问题,很有可能是pipe运行快,在系统调度task之前,已经将内存释放导致的,如果需要优化这一块的话,需要格外注意。