欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

c#中关于udp实现可靠地传输(数据包的分组发送)

程序员文章站 2022-04-13 13:45:17
...
在做c#中面向无连接的传输时用到了UDP,虽然没有TCP稳定可靠。但是效率是要高些,优势也有,缺点也有

就是有的时候要丢包,有的时候不得不用UDP,但是如何才能比较稳定的实现可靠传输呢,这是一个问题。

TCP传输数据的时候没有大小限制,但是UDP传输的时候是有大小限制的,我们怎么才能够实现大数据的稳定传输呢。我们想到了,把数据包分包。

把一个大数据分割为一系列的小数据包然后分开发送,然后服务端收到了就拼凑起完整数据。

如果遇到中途丢包就重发。

UDP线程类,实现数据的分包发送和重发。具体的接收操作需要实现其中的事件

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using Model;
using System.Net;
using Tool;
using System.Threading;

namespace ZZUdp.Core
{
    //udp的类  
    public class UDPThread
    {
        #region 私有变量

        UdpClient client;//UDP客户端

        List<UdpPacket> sendlist;// 用于轮询是否发送成功的记录

        Dictionary<long, RecDataList> RecListDic = new Dictionary<long, RecDataList>();//数据接收列表,每一个sequence对应一个

        IPEndPoint remotIpEnd = null;//用来在接收数据的时候对远程主机的信息存放

        int port=6666;//定义服务器的端口号
        #endregion

        #region 属性
        public int CheckQueueTimeInterval { get; set; }//检查发送队列间隔

        public int MaxResendTimes { get; set; }//没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发PackageSendFailture事件
        #endregion

        #region 事件
        
        /// <summary>
        /// 当数据包收到时触发
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageReceived;
        /// <summary>
        /// 当数据包收到事件触发时,被调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageReceived(PackageEventArgs e)
        {
            if (PackageReceived != null) 
                PackageReceived(this, e);
        }

        /// <summary>
        /// 数据包发送失败
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageSendFailure;
        /// <summary>
        /// 当数据发送失败时调用
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageSendFailure(PackageEventArgs e)
        {
            if (PackageSendFailure != null) 
                PackageSendFailure(this, e);
        }

        /// <summary>
        /// 数据包未接收到确认,重新发送
        /// </summary>
        public event EventHandler<PackageEventArgs> PackageResend;
        /// <summary>
        /// 触发重新发送事件
        /// </summary>
        /// <param name="e">包含事件的参数</param>
        protected virtual void OnPackageResend(PackageEventArgs e)
        {
            if (PackageResend != null) 
                PackageResend(this, e);
        }
        #endregion


        //无参构造函数
        public UDPThread()
        { 
        }
        //构造函数
        public UDPThread(string ipaddress, int port)
        {
            IPAddress ipA = IPAddress.Parse(ipaddress);//构造远程连接的参数
            IPEndPoint ipEnd = new IPEndPoint(ipA, port);
            client = new UdpClient();// client = new UdpClient(ipEnd)这样的话就没有创建远程连接
            client.Connect(ipEnd);//使用指定的远程主机信息建立默认远程主机连接
            sendlist = new List<UdpPacket>();

            CheckQueueTimeInterval = 2000;//轮询间隔时间
            MaxResendTimes = 5;//最大发送次数
            
            new Thread(new ThreadStart(CheckUnConfirmedQueue)) { IsBackground = true }.Start();//启动轮询线程
            //开始监听数据
            AsyncReceiveData();
        }
        /// <summary>
        /// 同步数据接收方法
        /// </summary>
        public void ReceiveData()
        {
            while (true)
            {
                IPEndPoint retip = null;
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.Receive(ref retip);//接收数据,当Client端连接主机的时候,retip就变成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                if (udpp != null)
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, retip);
                    OnPackageReceived(arg);//数据包收到触发事件
                }
            }
        }

        //异步接受数据
        public void AsyncReceiveData()
        {
            try
            {
                client.BeginReceive(new AsyncCallback(ReceiveCallback), null);
            }
            catch (SocketException ex)
            {
                throw ex;
            }
        }
        //接收数据的回调函数
        public void ReceiveCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                UdpPacket udpp = null;
                try
                {
                    byte[] data = client.EndReceive(param, ref remotIpEnd);//接收数据,当Client端连接主机的时候,test就变成Cilent端的IP了
                    udpp = (UdpPacket)SerializationUnit.DeserializeObject(data);
                }
                catch (Exception ex)
                {
                    //异常处理操作
                }
                finally
                {
                    AsyncReceiveData();
                }
                if (udpp != null)//触发数据包收到事件
                {
                    PackageEventArgs arg = new PackageEventArgs(udpp, null);
                    OnPackageReceived(arg);
                }
            }
        }


        /// <summary>
        /// 同步发送分包数据
        /// </summary>
        /// <param name="message"></param>
        public void SendData(Msg message)
        {
           
            ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步发送
               client.Send(udpPacketDatagram, udpPacketDatagram.Length,udpPacket.remoteip);
               if (udpPacket.IsRequireReceiveCheck)
                   PushSendItemToList(udpPacket);//将该消息压入列表

            }
        }
        
        /// <summary>
        /// 异步分包发送数组的方法
        /// </summary>
        /// <param name="message"></param>
        public void AsyncSendData(Msg message)
        {
            
            ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(message);
            foreach (UdpPacket udpPacket in udpPackets)
            {
                byte[] udpPacketDatagram = SerializationUnit.SerializeObject(udpPacket);
                //使用同步发送
                //client.Send(udpPacketDatagram, udpPacketDatagram.Length);

                //使用异步的方法发送数据
                this.client.BeginSend(udpPacketDatagram, udpPacketDatagram.Length, new AsyncCallback(SendCallback), null);

            }
        }
        //发送完成后的回调方法
        public void SendCallback(IAsyncResult param)
        {
            if (param.IsCompleted)
            {
                try
                {
                    client.EndSend(param);//这句话必须得写,BeginSend()和EndSend()是成对出现的 
                }
                catch (Exception e)
                {
                    //其他处理异常的操作
                }
            }

        }
        static object lockObj = new object();
        /// <summary>
        /// *线程,检测未发送的数据并发出,存在其中的就是没有收到确认包的数据包
        /// </summary>
        void CheckUnConfirmedQueue()
        {
            do
            {
                if (sendlist.Count > 0)
                {
                    UdpPacket[] array = null;

                    lock (sendlist)
                    {
                        array = sendlist.ToArray();
                    }
                    //挨个重新发送并计数
                    Array.ForEach(array, s =>
                    {
                        s.sendtimes++;
                        if (s.sendtimes >= MaxResendTimes)
                        {
                            //sOnPackageSendFailure//出发发送失败事件
                            sendlist.Remove(s);//移除该包
                        }
                        else
                        {
                            //重新发送
                            byte[] udpPacketDatagram = SerializationUnit.SerializeObject(s);
                            client.Send(udpPacketDatagram, udpPacketDatagram.Length, s.remoteip);
                        }
                    });
                }

               Thread.Sleep(CheckQueueTimeInterval);//间隔一定时间重发数据
            } while (true);
        }
        /// <summary>
        /// 将数据信息压入列表
        /// </summary>
        /// <param name="item"></param>
        void PushSendItemToList(UdpPacket item)
        {
            sendlist.Add(item);
        }
        /// <summary>
        /// 将数据包从列表中移除
        /// </summary>
        /// <param name="packageNo">数据包编号</param>
        /// <param name="packageIndex">数据包分包索引</param>
        public void PopSendItemFromList(long packageNo, int packageIndex)
        {
            lock (lockObj)
            {
                Array.ForEach(sendlist.Where(s => s.sequence == packageNo && s.index == packageIndex).ToArray(), s => sendlist.Remove(s));
            }
        }
        /// <summary>
        /// 关闭客户端并释放资源
        /// </summary>
        public void Dispose()
        {
            if (client != null)
            {
                client.Close();
                client = null;
            }
        }



    }
}

首先是数据信息实体类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;

namespace Model
{
    //封装消息类
    [Serializable]
   public class Msg
    {
        //所属用户的用户名
        public string name { get; set; }

        //所属用户的ip
        public string host { get; set; }

        //命令的名称
        public string command { get; set; }

        //收信人的姓名
        public string desname { get; set; }

        //你所发送的消息的目的地ip,应该是对应在服务器的列表里的主键值
        public string destinationIP { get; set; }

        //端口号
        public int port { get; set; }

        //文本消息
        public string msg { get; set; }

        //二进制消息
        public byte[] byte_msg { get; set; }

        //附加数据
        public string extend_msg { get; set; }

        //时间戳
        public DateTime time { get; set; }

        //构造函数
        public Msg(string command,string desip,string msg,string host)
        {
            this.command = command;
            this.destinationIP = desip;
            this.msg = msg;
            this.time = DateTime.Now;
            this.host = host;
        }
        override
        public string ToString()
        {
            return name + "说:" + msg;
        }
    }
}

MSG数据分割后生成分包数据

分包实体类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using System.Net;

namespace Model
{
    [Serializable]
    public class UdpPacket
    {
        public long sequence{get;set;}//所属组的唯一序列号 包编号
        public int total { get; set; }//分包总数
        public int index { get; set; }//消息包的索引
        public byte[] data { get; set; }//包的内容数组
        public int dataLength { get; set; }//分割的数组包大小
        public int remainder { get; set; }//最后剩余的数组的数据长度
        public int sendtimes { get; set; }//发送次数
        public IPEndPoint remoteip { get; set; }//接受该包的远程地址
        public bool IsRequireReceiveCheck { get; set; }//获得或设置包收到时是否需要返回确认包
        public static int HeaderSize = 30000;
        public UdpPacket(long sequence, int total, int index, byte[] data, int dataLength, int remainder,string desip,int port)
        {
            this.sequence = sequence;
            this.total = total;
            this.index = index;
            this.data = data;
            this.dataLength = dataLength;
            this.remainder = remainder;
            this.IsRequireReceiveCheck = true;//默认都需要确认包
            //构造远程地址
            IPAddress ipA = IPAddress.Parse(desip);
            this.remoteip = new IPEndPoint(ipA, port);
        }
        //把这个对象生成byte[]
        public byte[] ToArray()
        {
            return SerializationUnit.SerializeObject(this);
        }
    }
}

数据包分割工具类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;

namespace Model
{
        /// <summary>
        /// UDP数据包分割器
        /// </summary>
        public static class UdpPacketSplitter
        {


            public static ICollection<UdpPacket> Split(Msg message)
            {
                byte[] datagram = null;
                try
                {
                    datagram = SerializationUnit.SerializeObject(message);
                }
                catch (Exception e)
                {
                    //AddTalkMessage("数据转型异常");
                }
                //产生一个序列号,用来标识包数据属于哪一组
                Random Rd = new Random();
                long SequenceNumber = Rd.Next(88888, 999999);
                ICollection<UdpPacket> udpPackets = UdpPacketSplitter.Split(SequenceNumber, datagram, 10240, message.destinationIP, message.port);

                return udpPackets;
            }
            /// <summary>
            /// 分割UDP数据包
            /// </summary>
            /// <param name="sequence">UDP数据包所持有的序号</param>
            /// <param name="datagram">被分割的UDP数据包</param>
            /// <param name="chunkLength">分割块的长度</param>
            /// <returns>
            /// 分割后的UDP数据包列表
            /// </returns>
            public static ICollection<UdpPacket> Split(long sequence, byte[] datagram, int chunkLength,string desip,int port)
            {
                if (datagram == null)
                    throw new ArgumentNullException("datagram");

                List<UdpPacket> packets = new List<UdpPacket>();

                int chunks = datagram.Length / chunkLength;
                int remainder = datagram.Length % chunkLength;
                int total = chunks;
                if (remainder > 0) total++;

                for (int i = 1; i <= chunks; i++)
                {
                    byte[] chunk = new byte[chunkLength];
                    Buffer.BlockCopy(datagram, (i - 1) * chunkLength, chunk, 0, chunkLength);
                    packets.Add(new UdpPacket(sequence, total, i, chunk, chunkLength, remainder, desip, port));
                }
                if (remainder > 0)
                {
                    int length = datagram.Length - (chunkLength * chunks);
                    byte[] chunk = new byte[length];
                    Buffer.BlockCopy(datagram, chunkLength * chunks, chunk, 0, length);
                    packets.Add(new UdpPacket(sequence, total, total, chunk, chunkLength, remainder, desip, port));
                }

                return packets;
            }
        }
}

服务端存储数据的数据结构

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Tool;
using Model;

namespace Model
{
    //一个sequence对应一组的数据包的数据结构
    public class RecDataList
    {
        public long sequence { get; set; }//序列号
        //对应的存储包的List
        List<UdpPacket> RecudpPackets = new List<UdpPacket>();
        public int total { get; set; }
        public int dataLength { get; set; }
        public int remainder { get; set; }
        public byte[] DataBuffer = null;
        public RecDataList(UdpPacket udp)
        {

            this.sequence = udp.sequence;
            this.total = udp.total;
            this.dataLength = udp.dataLength;
            this.remainder = udp.remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[dataLength * (total - 1) + remainder];
            }
        }
        public RecDataList(long sequence, int total, int chunkLength, int remainder)
        {
            
            this.sequence = sequence;
            this.total = total;
            this.dataLength = chunkLength;
            this.remainder = remainder;
            if (DataBuffer == null)
            {
                DataBuffer = new byte[this.dataLength * (this.total - 1) + this.remainder];
            }
        }
        public void addPacket(UdpPacket p)
        {
            RecudpPackets.Add(p);
        }
        public Msg show() 
        {
            if (RecudpPackets.Count == total)//表示已经收集满了
            {
                //重组数据
                foreach (UdpPacket udpPacket in RecudpPackets)
                {
                    //偏移量
                    int offset = (udpPacket.index - 1) * udpPacket.dataLength;
                    Buffer.BlockCopy(udpPacket.data, 0, DataBuffer, offset, udpPacket.data.Length);
                }
                Msg rmsg = (Msg)SerializationUnit.DeserializeObject(DataBuffer);
                DataBuffer = null;
                RecudpPackets.Clear();
                return rmsg;
            }
            else
            {
                return null;
            }
        }
        public bool containskey(UdpPacket udp)
        {
            foreach (UdpPacket udpPacket in RecudpPackets)
            {
                if (udpPacket.index == udp.index)
                    return true;
            }
            return false;
        }
    }
}

编码工具类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;

namespace Tool
{
    public class EncodingTool
    {
        //编码
        public static byte[] EncodingASCII(string buf)
        {
            byte[] data = Encoding.Unicode.GetBytes(buf);
            return data;
        }
        //解码
        public static string DecodingASCII(byte[] bt)
        {
            string st = Encoding.Unicode.GetString(bt);
            return st;
        }



        //编码
        public static byte[] EncodingUTF_8(string buf)
        {
            byte[] data = Encoding.UTF8.GetBytes(buf);
            return data;
        }
        //编码
        public static string DecodingUTF_8(byte[] bt)
        {
            string st = Encoding.UTF8.GetString(bt);
            return st;
        }
       
    }
}


序列化和反序列化的工具类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
namespace Tool
{

    public class SerializationUnit
    {
        /// <summary>  
        /// 把对象序列化为字节数组  
        /// </summary>  
        public static byte[] SerializeObject(object obj)
        {
            if (obj == null)
                return null;
            //内存实例
            MemoryStream ms = new MemoryStream();
            //创建序列化的实例
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, obj);//序列化对象,写入ms流中  
            ms.Position = 0;
            //byte[] bytes = new byte[ms.Length];//这个有错误
            byte[] bytes = ms.GetBuffer();
            ms.Read(bytes, 0, bytes.Length);
            ms.Close();
            return bytes;
        }

        /// <summary>  
        /// 把字节数组反序列化成对象  
        /// </summary>  
        public static object DeserializeObject(byte[] bytes)
        {
            object obj = null;
            if (bytes == null)
                return obj;
            //利用传来的byte[]创建一个内存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            obj = formatter.Deserialize(ms);//把内存流反序列成对象  
            ms.Close();
            return obj;
        }
        /// <summary>
        /// 把字典序列化
        /// </summary>
        /// <param name="dic"></param>
        /// <returns></returns>
        public static byte[] SerializeDic(Dictionary<string, object> dic)
        {
            if (dic.Count == 0)
                return null;
            MemoryStream ms = new MemoryStream();
            BinaryFormatter formatter = new BinaryFormatter();
            formatter.Serialize(ms, dic);//把字典序列化成流

            byte[] bytes = new byte[ms.Length];//从流中读出byte[]
            ms.Read(bytes, 0, bytes.Length);

            return bytes;
        }
        /// <summary>
        /// 反序列化返回字典
        /// </summary>
        /// <param name="bytes"></param>
        /// <returns></returns>
        public static Dictionary<string, object> DeserializeDic(byte[] bytes)
        {
            Dictionary<string, object> dic = null;
            if (bytes == null)
                return dic;
            //利用传来的byte[]创建一个内存流
            MemoryStream ms = new MemoryStream(bytes);
            ms.Position = 0;
            BinaryFormatter formatter = new BinaryFormatter();
            //把流中转换为Dictionary
            dic = (Dictionary<string, object>)formatter.Deserialize(ms);
            return dic;
        }
    }

}

通用的数据包事件类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Model;

namespace ZZUdp.Core
{
	/// <summary>
	/// 数据包事件数据
	/// </summary>
	public class PackageEventArgs : EventArgs
	{
		/// <summary>
		/// 网络消息包
		/// </summary>
		public UdpPacket udpPackage { get; set; }

		/// <summary>
		/// 网络消息包组
		/// </summary>
        public UdpPacket[] udpPackages { get; set; }

        /// <summary>
        /// 远程IP
        /// </summary>
        public IPEndPoint RemoteIP { get; set; }

		/// <summary>
		/// 是否已经处理
		/// </summary>
		public bool IsHandled { get; set; }

		/// <summary>
		/// 创建一个新的 PackageEventArgs 对象.
		/// </summary>
        public PackageEventArgs(UdpPacket package, IPEndPoint RemoteIP)
		{
            this.udpPackage = package;
            this.RemoteIP = RemoteIP;
			this.IsHandled = false;
		}
	}
}

以上就是c#中关于udp实现可靠地传输(数据包的分组发送) 的内容,更多相关内容请关注PHP中文网(www.php.cn)!

相关标签: c#,udp,传输