欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

粘包处理现象及其解决方案——基于NewLife.Net网络库的管道式帧长粘包处理方法

程序员文章站 2022-06-27 20:47:10
粘包处理现象及其解决方案——基于NewLife.Net网络库的管道式帧长粘包处理方法 [toc] 1.粘包现象 每个TCP 长连接都有自己的socket缓存buffer,默认大小是8K,可支持手动设置。粘包是TCP长连接中最常见的现象,如下图 socket缓存中有5帧(或者说5包)心跳数据,包头即F ......

粘包处理现象及其解决方案——基于newlife.net网络库的管道式帧长粘包处理方法

1.粘包现象

每个tcp 长连接都有自己的socket缓存buffer,默认大小是8k,可支持手动设置。粘包是tcp长连接中最常见的现象,如下图

粘包处理现象及其解决方案——基于NewLife.Net网络库的管道式帧长粘包处理方法

socket缓存中有5帧(或者说5包)心跳数据,包头即f0 aa 55 0f(十六进制),通过数包头数据我们确认出来缓存里有5帧心跳包,但是5帧数据彼此头尾相连粘合在了一起,这种常见的tcp缓存现象,我们称之为粘包。

2.粘包原因

2.1. 同一客户端连续发送

同一客户端连续发送心跳数据,当tcp服务端还来不及解析(如果解析完会把缓存清掉)。造成了同一缓存数据包的粘合。

2.2. 网络拥塞造成粘包

当某一时刻发生了网络拥塞,一会之后,突然网络畅通,tcp服务端收到同一客户端的多个心跳包,多个数据包会在tcp服务端的缓存中进行了粘合。

2.3. 服务端卡死了

当服务端因为计算量过大或者其他的原因,计算缓慢,来不及处理tcp socket缓存中的数据,多个心跳包(或者其他报文)也会在socket缓存中首尾相连,粘包。

总而言之,就是多个数据包在同一个tcp socket缓存中进行了首尾相连现象,即为粘包现象。

3. 粘包的危害

由于粘包现象存在的客观性,我们必须人为地在程序逻辑里将其区分,如果不去区分,任由各个数据包进行粘连,有以下几点危害:

3.1. 无法正确解析数据包

服务端会不断识别为无效包,告诉客户端,客户端会再次上报,因此会增加客户端服务端的运行压力,如果本身运算量很大,则会出现一些异常奔溃现象。

3.2. 错误数据包被错误解析

无巧不成书,如果错误的粘包,凑巧被服务端进行成功解析,则会进行错误的handler 处理。这样的错误处理方式危害会超过3.1。

3.3. 进入死循环

如果频率过快,则会出现这种现象,服务器不断识别粘包为无效包,客户端不断上报,以此消耗cpu的占用率。

综上,我们必须要进行tcp的粘包处理,这是软件系统健壮性跟异常处理机制的基础。

4. 粘包的逻辑处理方式

4.1. 根据包尾特征参数进行区分

规定几个字节为每帧tcp报文的包尾特征(比如4个字节),检索整个socket缓存字节,每当检测到包尾特征字节的时候,就划分报文,以此来正确分割粘包。
特征:需要检测每个字节,效率较低,适合短报文,如果报文很长则不适合。

4.2. 根据包头包尾特征参数进行区分

与4.1相似,多了包头检测部分。
特征:只需检测第一帧的每个字节,第二帧只需检测包头部分,适合长报文

4.3. 根据报文长度来进行粘包区分

根据报文长度偏置值,读第一帧的报文,从粘包中(socket缓存)划分出第一帧正确报文,找第二帧的报文长度,划分第二帧,以此划分到底。
举例:如下长度偏置为5(从0开始计算),即第6,第7字节为报文长度字节。

粘包处理现象及其解决方案——基于NewLife.Net网络库的管道式帧长粘包处理方法

特征:只需检测报文长度部分,适合长短报文的粘包划分。

5. 根据报文长度来区分粘包的代码落地——基于newlife.net的管道处理

5.1. newlife.net管道架构处理方式

newlife.net管道架构的设计,参考了java的netty开源框架,因此大部分netty的编解码器都可以在此使用。
具体在代码中的表现为

 _pemsserver.add(new stickpackagesplit { size = 2 });

即将lengthcodec这个编解码器加入到了管道中去,所有的message都会经过lengthcodec这里主要是解码功能,没有进行编码,解码成功后(粘包根据长度划分出多个有效包)推送到onreceive方法中去。size = 2表示报文长度是2个字节。

5.2. 跟http的管道类比

与net core 的webapi项目的管道添加,是否发现似曾相识?

  app.useauthentication();
  app.userequestlog();
  app.usecors(_defaultcorspolicyname);
  app.usemvc();

管道添加的先后顺序即数据流流经管道的顺序。只是没去追求是先有socket的管道处理机制,还是http 上下文的管道处理机制。但是道理是相同的。

5.3.拆分粘包解码器(根据长度解码)

5.3.1. 长度偏移地址offset属性

长度所在位置的偏移地址。默认为5,解释详见4.3。

        //
        // 摘要:
        //     长度所在位置
        public int offset
        {
            get;
            set;
        } = 5;

5.3.2.长度字节数size属性

本文讨论长度字节数为2,详见4.3

        //
        // 摘要:
        //     长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2
        public int size
        {
            get;
            set;
        } = 2;

5.3.3. 编码方法encode

        //
        // 摘要:
        //     编码,此应用不需要编码,只需解码,
        //     按长度将粘包划分成多个数据包
        //
        // 参数:
        //   context:
        //
        //   msg:
        protected override object encode(ihandlercontext context, packet msg)
       { 
           return msg;
       }

这里无需编码,故直接返回msg。

5.3.4. 解码方法decode

        //
        // 摘要:
        //     解码
        //
        // 参数:
        //   context:
        //
        //   pk:
        protected override ilist<packet> decode(ihandlercontext context, packet pk)
        {
            iextend extend = context.owner as iextend;

            lengthcodec packetcodec = extend["codec"] as lengthcodec;
           
            if (packetcodec == null)
            {
                iextend extend2 = extend;
                lengthcodec obj = new lengthcodec
                {
                    expire = expire,
                    getlength = ((packet p) => messagecodec<packet>.getlength(p, offset, size))
                };
                packetcodec = obj;
                extend2["codec"] = obj;
            }
            
            console.writeline("报文解码前:{0}", bitconverter.tostring(pk.toarray()));
            ilist<packet> list = packetcodec.parse(pk);
            console.writeline("报文解码");
            foreach (var item in list)
            {
                console.writeline("粘包处理结果:{0}", bitconverter.tostring(item.toarray()));
            }

            return list;
        }

5.3.4.1.解码步骤1——实例化长度解码器对象

实例化长度解码器完成之后,并将其添加到字典中去。

    iextend extend2 = extend;
    lengthcodec obj = new lengthcodec
    {
        expire = expire,
        getlength = ((packet p) => messagecodec<packet>.getlength(p, offset, size))
    };
    packetcodec = obj;
    extend2["codec"] = obj;

5.3.4.2.解码步骤2——将解码前的报文打印

此步骤非必须,为了最后能让读者看到效果增加。

    console.writeline("报文解码前:{0}", bitconvertetostring(pk.toarray()));

5.3.4.3.解码步骤3——将报文进行解码

 ilist<packet> list = packetcodec.parse(pk);

解码代码如下:

        //
        // 摘要:
        //     分析数据流,得到一帧数据
        //
        // 参数:
        //   pk:
        //     待分析数据包
        public virtual ilist<packet> parse(packet pk)
        {
            memorystream stream = stream;
            bool num = stream == null || stream.position < 0 || stream.position >= stream.length;
            list<packet> list = new list<packet>();


            if (num)
            {

                if (pk == null)
                {
                    return list.toarray();
                }
                int i;
                int num2;

                for (i = 0; i < pk.total; i += num2)
                {
                    packet packet = pk.slice(i);

                    num2 = getlength(packet);

                    console.writeline(" pk. getlength(packet):{0}", num2);

                    if (num2 <= 0 || num2 > packet.total)
                    {
                        break;
                    }
                    packet.set(packet.data, packet.offset, num2);
                    list.add(packet);
                }


                if (i == pk.total)
                {
                  
                    return list.toarray();
                }
                pk = pk.slice(i);
            }

            lock (this)
            {
                checkcache();
                stream = stream;
                if (pk != null && pk.total > 0)
                {
                    long position = stream.position;
                    stream.position = stream.length;
                    pk.copyto(stream);
                    stream.position = position;
                }
                while (stream.position < stream.length)
                {
                    packet packet2 = new packet(stream);
                    int num3 = getlength(packet2);
                    if (num3 <= 0 || num3 > packet2.total)
                    {
                        break;
                    }
                    packet2.set(packet2.data, packet2.offset, num3);
                    list.add(packet2);
                    stream.seek(num3, seekorigin.current);
                }
                if (stream.position >= stream.length)
                {
                    stream.setlength(0l);
                    stream.position = 0l;
                }


                return list;
            }
        }

解码核心代码如下:
即获得每帧报文的长度,通过委托方法 getlength(packet),然后循环所有粘包报文,根据每帧报文的长度分割保存到list中去,最后返回list。list的每个元素会触发message接收事件。

委托的使用请敬请关注下一篇,委托代码详见6.

    for (i = 0; i < pk.total; i += num2)
    {
        packet packet = pk.slice(i);

        num2 = getlength(packet);

        console.writeline(" pk. getlength(packet):{0}", num2);

        if (num2 <= 0 || num2 > packet.total)
        {
            break;
        }
        packet.set(packet.data, packet.offset, num2);
        list.add(packet);
    }

5.3.4.4.将粘包处理结果进行打印

    foreach (var item in list)
    {
        console.writeline("粘包处理结果:{0}"bitconverter.tostring(item.toarray()));
    }

5.3.5.清空粘包编码器

该方法由newlife.net网络库调用,我们无需关心。

    //
    // 摘要:
    //     连接关闭时,清空粘包编码器
    //
    // 参数:
    //   context:
    //
    //   reason:
    public override bool close(ihandlercontext contextstring reason)
    {
        iextend extend = context.owner as iextend;
        if (extend != null)
        {
            extend["codec"] = null;
        }
        return base.close(context, reason);
    }

5.3.6.完整拆分粘包解码器代码

    // 摘要:
    //     长度字段作为头部
    // 
    public class stickpackagesplit : messagecodec<packet>
    {
        //
        // 摘要:
        //     长度所在位置
        public int offset
        {
            get;
            set;
        } = 5;

        //
        // 摘要:
        //     长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2
        public int size
        {
            get;
            set;
        } = 2;


        //
        // 摘要:
        //     过期时间,超过该时间后按废弃数据处理,默认500ms
        public int expire
        {
            get;
            set;
        } = 500;


        //
        // 摘要:
        //     编码,此应用不需要编码,只需解码,
        //     按长度将粘包划分成多个数据包
        //
        // 参数:
        //   context:
        //
        //   msg:
        protected override object encode(ihandlercontext context, packet msg)
       { 
           return msg;
       }

        //
        // 摘要:
        //     解码
        //
        // 参数:
        //   context:
        //
        //   pk:
        protected override ilist<packet> decode(ihandlercontext context, packet pk)
        {
            iextend extend = context.owner as iextend;

            lengthcodec packetcodec = extend["codec"] as lengthcodec;
           

            if (packetcodec == null)
            {
                iextend extend2 = extend;
                lengthcodec obj = new lengthcodec
                {
                    expire = expire,
                    getlength = ((packet p) => messagecodec<packet>.getlength(p, offset, size))
                };
                packetcodec = obj;
                extend2["codec"] = obj;
            }
            
            console.writeline("报文解码前:{0}", bitconverter.tostring(pk.toarray()));
            ilist<packet> list = packetcodec.parse(pk);
            console.writeline("报文解码");
            foreach (var item in list)
            {
                console.writeline("粘包处理结果:{0}", bitconverter.tostring(item.toarray()));
            }

            return list;
        }

        //
        // 摘要:
        //     连接关闭时,清空粘包编码器
        //
        // 参数:
        //   context:
        //
        //   reason:
        public override bool close(ihandlercontext context, string reason)
        {
            iextend extend = context.owner as iextend;
            if (extend != null)
            {
                extend["codec"] = null;
            }
            return base.close(context, reason);
        }
    }

6.长度计算委托getlength

5.3.6中会调用如下每个包的长度计算委托。关于委托的使用方法会在下一篇讲解,这里不再展开。

//
// 摘要:
//     从数据流中获取整帧数据长度
//
// 参数:
//   pk:
//
//   offset:
//
//   size:
//
// 返回结果:
//     数据帧长度(包含头部长度位)
protected static int getlength(packet pk, int offsetint size)
{
    if (offset < 0)
    {
        return pk.total - pk.offset;
    }
    int offset2 = pk.offset;
    if (offset >= pk.total)
    {
        return 0;
    }
    int num = 0;
    switch (size)
    {
        case 0:
            {
                memorystream stream = pk.getstream();
                if (offset > 0)
                {
                    stream.seek(offset, seekorigicurrent);
                }
                num = stream.readencodedint();
                num += (int)(stream.position - offset);
                break;
            }
        case 1:
            num = pk[offset];
            break;
        case 2:
            num = pk.readbytes(offset, 2).touint16();
            break;
        case 4:
            num = (int)pk.readbytes(offset, 4).touint32;
            break;
        case -2:
            num = pk.readbytes(offset, 2).touint16(0islittleendian: false);
            break;
        case -4:
            num = (int)pk.readbytes(offset, 4).touint(0, islittleendian: false);
            break;
        default:
            throw new notsupportedexception();
    }
    if (num > pk.total)
    {
        return 0;
    }          
    return num;
}

7.最终粘包拆分效果图

粘包处理现象及其解决方案——基于NewLife.Net网络库的管道式帧长粘包处理方法


版权声明:本文为博主原创文章,遵循 cc 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://www.cnblogs.com/jerrymouseli/p/12659903.html