如何应用C#实现UDP的分包组包
程序员文章站
2023-12-13 11:52:22
场景介绍如果需要使用udp传输较大数据,例如传输10m的图片,这突破了udp的设计原则。udp的设计是基于"datagram",也就是它假设你发送的每个数据包都能包含在单一...
场景介绍
如果需要使用udp传输较大数据,例如传输10m的图片,这突破了udp的设计原则。udp的设计是基于"datagram",也就是它假设你发送的每个数据包都能包含在单一的包内。并且设定udp数据包的最大长度受基础网络协议的限制。
udp数据包的理论最大长度限制是 65535 bytes,这包含 8 bytes 数据包头和 65527 bytes 数据。但如果基于ipv4网络传输,则还需减去 20 bytes 的ip数据包头。
则单一的udp数据包可传输的数据最大长度为:
则单一的udp数据包可传输的数据最大长度为:
maxudpdatalength = 65535 - 8 - 20 = 65507 bytes
这就需要实现udp包的分包传输和接收组包功能。
分包功能
复制代码 代码如下:
/// <summary>
/// udp数据包分割器
/// </summary>
public static class udppacketsplitter
{
/// <summary>
/// 分割udp数据包
/// </summary>
/// <param name="sequence">udp数据包所持有的序号</param>
/// <param name="datagram">被分割的udp数据包</param>
/// <param name="chunklength">分割块的长度</param>
/// <returns>
/// 分割后的udp数据包列表
/// </returns>
public static icollection<udppacket> split(long sequence, byte[] datagram, int chunklength)
{
if (datagram == null)
throw new argumentnullexception("datagram");
list<udppacket> packets = new list<udppacket>();
int chunks = datagram.length / chunklength;
int remainder = datagram.length % chunklength;
int total = chunks;
if (remainder > 0) total++;
for (int i = 1; i <= chunks; i++)
{
byte[] chunk = new byte[chunklength];
buffer.blockcopy(datagram, (i - 1) * chunklength, chunk, 0, chunklength);
packets.add(new udppacket(sequence, total, i, chunk, chunklength));
}
if (remainder > 0)
{
int length = datagram.length - (chunklength * chunks);
byte[] chunk = new byte[length];
buffer.blockcopy(datagram, chunklength * chunks, chunk, 0, length);
packets.add(new udppacket(sequence, total, total, chunk, length));
}
return packets;
}
}
发送分包
复制代码 代码如下:
private void workthread()
{
while (isrunning)
{
waiter.waitone();
waiter.reset();
while (queue.count > 0)
{
streampacket packet = null;
if (queue.trydequeue(out packet))
{
rtppacket rtppacket = rtppacket.fromimage(
rtppayloadtype.jpeg,
packet.sequencenumber,
(long)epoch.getdatetimetotalmillisecondsbyyesterday(packet.timestamp),
packet.frame);
// max udp packet length limited to 65,535 bytes
byte[] datagram = rtppacket.toarray();
packet.frame.dispose();
// split udp packet to many packets
// to reduce the size to 65507 limit by underlying ipv4 protocol
icollection<udppacket> udppackets
= udppacketsplitter.split(
packet.sequencenumber,
datagram,
65507 - udppacket.headersize);
foreach (var udppacket in udppackets)
{
byte[] udppacketdatagram = udppacket.toarray();
// async sending
udpclient.beginsend(
udppacketdatagram, udppacketdatagram.length,
packet.destination.address,
packet.destination.port,
sendcompleted, udpclient);
}
}
}
}
}
接收组包功能
复制代码 代码如下:
private void ondatagramreceived(object sender, udpdatagramreceivedeventargs<byte[]> e)
{
try
{
udppacket udppacket = udppacket.fromarray(e.datagram);
if (udppacket.total == 1)
{
rtppacket packet = new rtppacket(udppacket.payload, udppacket.payloadsize);
bitmap bitmap = packet.tobitmap();
raisenewframeevent(
bitmap, epoch.getdatetimebyyesterdaytotalmilliseconds(packet.timestamp));
}
else
{
// rearrange packets to one packet
if (packetcache.containskey(udppacket.sequence))
{
list<udppacket> udppackets = null;
if (packetcache.trygetvalue(udppacket.sequence, out udppackets))
{
udppackets.add(udppacket);
if (udppackets.count == udppacket.total)
{
packetcache.tryremove(udppacket.sequence, out udppackets);
udppackets = udppackets.orderby(u => u.order).tolist();
int rtppacketlength = udppackets.sum(u => u.payloadsize);
int maxpacketlength = udppackets.select(u => u.payloadsize).max();
byte[] rtppacket = new byte[rtppacketlength];
foreach (var item in udppackets)
{
buffer.blockcopy(
item.payload, 0, rtppacket,
(item.order - 1) * maxpacketlength, item.payloadsize);
}
rtppacket packet = new rtppacket(rtppacket, rtppacket.length);
bitmap bitmap = packet.tobitmap();
raisenewframeevent(
bitmap,
epoch.getdatetimebyyesterdaytotalmilliseconds(packet.timestamp));
packetcache.clear();
}
}
}
else
{
list<udppacket> udppackets = new list<udppacket>();
udppackets.add(udppacket);
packetcache.addorupdate(
udppacket.sequence,
udppackets, (k, v) => { return udppackets; });
}
}
}
catch (exception ex)
{
raisevideosourceexceptionevent(ex.message);
}
}