欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

如何应用C#实现UDP的分包组包

程序员文章站 2024-02-11 20:56:04
场景介绍如果需要使用udp传输较大数据,例如传输10m的图片,这突破了udp的设计原则。udp的设计是基于"datagram",也就是它假设你发送的每个数据包都能包含在单一...

场景介绍
如果需要使用udp传输较大数据,例如传输10m的图片,这突破了udp的设计原则。udp的设计是基于"datagram",也就是它假设你发送的每个数据包都能包含在单一的包内。并且设定udp数据包的最大长度受基础网络协议的限制。

如何应用C#实现UDP的分包组包

udp数据包的理论最大长度限制是 65535 bytes,这包含 8 bytes 数据包头和 65527 bytes 数据。但如果基于ipv4网络传输,则还需减去 20 bytes 的ip数据包头。
则单一的udp数据包可传输的数据最大长度为:

则单一的udp数据包可传输的数据最大长度为:

maxudpdatalength = 65535 - 8 - 20 = 65507 bytes

这就需要实现udp包的分包传输和接收组包功能。

分包功能

复制代码 代码如下:

/// <summary>
   /// udp数据包分割器
   /// </summary>
   public static class udppacketsplitter
   {
     /// <summary>
     /// 分割udp数据包
     /// </summary>
     /// <param name="sequence">udp数据包所持有的序号</param>
     /// <param name="datagram">被分割的udp数据包</param>
     /// <param name="chunklength">分割块的长度</param>
     /// <returns>
     /// 分割后的udp数据包列表
     /// </returns>
     public static icollection<udppacket> split(long sequence, byte[] datagram, int chunklength)
     {
       if (datagram == null)
         throw new argumentnullexception("datagram");

       list<udppacket> packets = new list<udppacket>();

       int chunks = datagram.length / chunklength;
       int remainder = datagram.length % chunklength;
       int total = chunks;
       if (remainder > 0) total++;

       for (int i = 1; i <= chunks; i++)
       {
         byte[] chunk = new byte[chunklength];
         buffer.blockcopy(datagram, (i - 1) * chunklength, chunk, 0, chunklength);
         packets.add(new udppacket(sequence, total, i, chunk, chunklength));
       }
       if (remainder > 0)
       {
         int length = datagram.length - (chunklength * chunks);
         byte[] chunk = new byte[length];
         buffer.blockcopy(datagram, chunklength * chunks, chunk, 0, length);
         packets.add(new udppacket(sequence, total, total, chunk, length));
       }

       return packets;
     }
   }

发送分包
复制代码 代码如下:

private void workthread()
 {
   while (isrunning)
   {
     waiter.waitone();
     waiter.reset();

     while (queue.count > 0)
     {
       streampacket packet = null;
       if (queue.trydequeue(out packet))
       {
         rtppacket rtppacket = rtppacket.fromimage(
           rtppayloadtype.jpeg,
           packet.sequencenumber,
           (long)epoch.getdatetimetotalmillisecondsbyyesterday(packet.timestamp),
           packet.frame);

         // max udp packet length limited to 65,535 bytes
         byte[] datagram = rtppacket.toarray();
         packet.frame.dispose();

         // split udp packet to many packets
         // to reduce the size to 65507 limit by underlying ipv4 protocol
         icollection<udppacket> udppackets
           = udppacketsplitter.split(
             packet.sequencenumber,
             datagram,
             65507 - udppacket.headersize);
         foreach (var udppacket in udppackets)
         {
           byte[] udppacketdatagram = udppacket.toarray();
           // async sending
           udpclient.beginsend(
             udppacketdatagram, udppacketdatagram.length,
             packet.destination.address,
             packet.destination.port,
             sendcompleted, udpclient);
         }
       }
     }
   }
 }

接收组包功能
复制代码 代码如下:

private void ondatagramreceived(object sender, udpdatagramreceivedeventargs<byte[]> e)
     {
       try
       {
         udppacket udppacket = udppacket.fromarray(e.datagram);

         if (udppacket.total == 1)
         {
           rtppacket packet = new rtppacket(udppacket.payload, udppacket.payloadsize);
           bitmap bitmap = packet.tobitmap();
           raisenewframeevent(
             bitmap, epoch.getdatetimebyyesterdaytotalmilliseconds(packet.timestamp));
         }
         else
         {
           // rearrange packets to one packet
           if (packetcache.containskey(udppacket.sequence))
           {
             list<udppacket> udppackets = null;
             if (packetcache.trygetvalue(udppacket.sequence, out udppackets))
             {
               udppackets.add(udppacket);

               if (udppackets.count == udppacket.total)
               {
                 packetcache.tryremove(udppacket.sequence, out udppackets);

                 udppackets = udppackets.orderby(u => u.order).tolist();
                 int rtppacketlength = udppackets.sum(u => u.payloadsize);
                 int maxpacketlength = udppackets.select(u => u.payloadsize).max();

                 byte[] rtppacket = new byte[rtppacketlength];
                 foreach (var item in udppackets)
                 {
                   buffer.blockcopy(
                     item.payload, 0, rtppacket,
                     (item.order - 1) * maxpacketlength, item.payloadsize);
                 }

                 rtppacket packet = new rtppacket(rtppacket, rtppacket.length);
                 bitmap bitmap = packet.tobitmap();
                 raisenewframeevent(
                   bitmap,
                   epoch.getdatetimebyyesterdaytotalmilliseconds(packet.timestamp));

                 packetcache.clear();
               }
             }
           }
           else
           {
             list<udppacket> udppackets = new list<udppacket>();
             udppackets.add(udppacket);
             packetcache.addorupdate(
               udppacket.sequence,
               udppackets, (k, v) => { return udppackets; });
           }
         }
       }
       catch (exception ex)
       {
         raisevideosourceexceptionevent(ex.message);
       }
     }