欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#中一个高性能异步socket封装库的实现思路分享

程序员文章站 2023-12-18 23:24:52
前言 socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。 异步通讯实际是利用windows完成端口(iocp)...

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(iocp)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

为了使大家对通讯效率有初步了解,先看测试图。

C#中一个高性能异步socket封装库的实现思路分享

主机配置情况

C#中一个高性能异步socket封装库的实现思路分享

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

库的结构图

C#中一个高性能异步socket封装库的实现思路分享

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1k的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

netlistener 监听

using system;
using system.net;
using system.net.sockets;
using system.threading;
 
namespace iocpcore
{
 class netlistener
 {
  private socket listensocket;
  public listenparam _listenparam { get; set; }
  public event action<listenparam, asyncsocketclient> onacceptsocket;
 
  bool start;
 
  netserver _netserver;
  public netlistener(netserver netserver)
  {
   _netserver = netserver;
  }
 
  public int _acceptasynccount = 0;
  public bool startlisten()
  {
   try
   {
    start = true;
    ipendpoint listenpoint = new ipendpoint(ipaddress.parse("0.0.0.0"), _listenparam._port);
    listensocket = new socket(listenpoint.addressfamily, sockettype.stream, protocoltype.tcp);
    listensocket.bind(listenpoint);
    listensocket.listen(200);
 
    thread thread1 = new thread(new threadstart(netprocess));
    thread1.start();
    
    startaccept();
    return true;
   }
   catch (exception ex)
   {
    netlogger.log(string.format("**监听异常!{0}", ex.message));
    return false;
   }
  }
 
  autoresetevent _acceptevent = new autoresetevent(false);
  private void netprocess()
  {
   while (start)
   {
    dealnewaccept();
    _acceptevent.waitone(1000 * 10);
   }
  }
 
  private void dealnewaccept()
  {
   try
   {
    if(_acceptasynccount <= 10)
    {
     startaccept();
    }
 
    while (true)
    {
     asyncsocketclient client = _newsocketclientlist.getobj();
     if (client == null)
      break;
 
     dealnewaccept(client);
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("dealnewaccept 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 
  private void dealnewaccept(asyncsocketclient client)
  {
   client.sendbufferbytecount = _netserver.sendbufferbyteperclient;
   onacceptsocket?.invoke(_listenparam, client);
  }
 
  private void accepteventarg_completed(object sender, socketasynceventargs accepteventargs)
  {
   try
   {
    interlocked.decrement(ref _acceptasynccount);
    _acceptevent.set();
    accepteventargs.completed -= accepteventarg_completed;
    processaccept(accepteventargs);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("accepteventarg_completed {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 
  public bool startaccept()
  {
   socketasynceventargs accepteventargs = new socketasynceventargs();
   accepteventargs.completed += accepteventarg_completed;
 
   bool willraiseevent = listensocket.acceptasync(accepteventargs);
   interlocked.increment(ref _acceptasynccount);
 
   if (!willraiseevent)
   {
    interlocked.decrement(ref _acceptasynccount);
    _acceptevent.set();
    accepteventargs.completed -= accepteventarg_completed;
    processaccept(accepteventargs);
   }
   return true;
  }
 
  objectpool<asyncsocketclient> _newsocketclientlist = new objectpool<asyncsocketclient>();
  private void processaccept(socketasynceventargs accepteventargs)
  {
   try
   {
    using (accepteventargs)
    {
     if (accepteventargs.acceptsocket != null)
     {
      asyncsocketclient client = new asyncsocketclient(accepteventargs.acceptsocket);
      client.createclientinfo(this);
 
      _newsocketclientlist.putobj(client);
      _acceptevent.set();
     }
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("processaccept {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 }
}

netconnectmanage连接处理

using system;
using system.net;
using system.net.sockets;

namespace iocpcore
{
 class netconnectmanage
 {
  public event action<socketeventparam, asyncsocketclient> onsocketconnectevent;

  public bool connectasyn(string peerip, int peerport, object tag)
  {
   try
   {
    socket socket = new socket(sockettype.stream, protocoltype.tcp);
    socketasynceventargs socketeventargs = new socketasynceventargs();
    socketeventargs.remoteendpoint = new ipendpoint(ipaddress.parse(peerip), peerport);
    socketeventargs.completed += socketconnect_completed;

    socketclientinfo clientinfo = new socketclientinfo();
    socketeventargs.usertoken = clientinfo;
    clientinfo.peerip = peerip;
    clientinfo.peerport = peerport;
    clientinfo.tag = tag;

    bool willraiseevent = socket.connectasync(socketeventargs);
    if (!willraiseevent)
    {
     processconnect(socketeventargs);
     socketeventargs.completed -= socketconnect_completed;
     socketeventargs.dispose();
    }
    return true;
   }
   catch (exception ex)
   {
    netlogger.log("connectasyn",ex);
    return false;
   }
  }

  private void socketconnect_completed(object sender, socketasynceventargs socketeventargs)
  {
   processconnect(socketeventargs);
   socketeventargs.completed -= socketconnect_completed;
   socketeventargs.dispose();
  }

  private void processconnect(socketasynceventargs socketeventargs)
  {
   socketclientinfo clientinfo = socketeventargs.usertoken as socketclientinfo;
   if (socketeventargs.socketerror == socketerror.success)
   {
    dealconnectsocket(socketeventargs.connectsocket, clientinfo);
   }
   else
   {
    socketeventparam socketparam = new socketeventparam(en_socketevent.connect, null);
    socketparam.clientinfo = clientinfo;
    onsocketconnectevent?.invoke(socketparam, null);
   }
  }


  void dealconnectsocket(socket socket, socketclientinfo clientinfo)
  {
   clientinfo.setclientinfo(socket);

   asyncsocketclient client = new asyncsocketclient(socket);
   client.setclientinfo(clientinfo);

   //触发事件
   socketeventparam socketparam = new socketeventparam(en_socketevent.connect, socket);
   socketparam.clientinfo = clientinfo;
   onsocketconnectevent?.invoke(socketparam, client);
  }

  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   socket = null;
   try
   {
    socket sockettmp = new socket(sockettype.stream, protocoltype.tcp);

    socketclientinfo clientinfo = new socketclientinfo();
    clientinfo.peerip = peerip;
    clientinfo.peerport = peerport;
    clientinfo.tag = tag;

    endpoint remoteep = new ipendpoint(ipaddress.parse(peerip), peerport);
    sockettmp.connect(remoteep);
    if (!sockettmp.connected)
     return false;

    dealconnectsocket(sockettmp, clientinfo);
    socket = sockettmp;
    return true;
   }
   catch (exception ex)
   {
    netlogger.log(string.format("连接对方:({0}:{1})出错!", peerip, peerport), ex);
    return false;
   }
  }
 }
}

asyncsocketclient socket收发处理

using system;
using system.collections.generic;
using system.diagnostics;
using system.net;
using system.net.sockets;

namespace iocpcore
{
 public class asyncsocketclient
 {
  public static int iocpreadlen = 1024;

  public readonly socket connectsocket;

  protected socketasynceventargs m_receiveeventargs;
  public socketasynceventargs receiveeventargs { get { return m_receiveeventargs; } set { m_receiveeventargs = value; } }
  protected byte[] m_asyncreceivebuffer;

  protected socketasynceventargs m_sendeventargs;
  public socketasynceventargs sendeventargs { get { return m_sendeventargs; } set { m_sendeventargs = value; } }
  protected byte[] m_asyncsendbuffer;

  public event action<asyncsocketclient, byte[]> onreaddata;
  public event action<asyncsocketclient, int> onsenddata;
  public event action<asyncsocketclient> onsocketclose;

  static object releaselock = new object();
  public static int createcount = 0;
  public static int releasecount = 0;

  ~asyncsocketclient()
  {
   lock (releaselock)
   {
    releasecount++;
   }
  }

  public asyncsocketclient(socket socket)
  {
   lock (releaselock)
   {
    createcount++;
   }

   connectsocket = socket;

   m_receiveeventargs = new socketasynceventargs();
   m_asyncreceivebuffer = new byte[iocpreadlen];
   m_receiveeventargs.acceptsocket = connectsocket;
   m_receiveeventargs.completed += receiveeventargs_completed;

   m_sendeventargs = new socketasynceventargs();
   m_asyncsendbuffer = new byte[iocpreadlen * 2];
   m_sendeventargs.acceptsocket = connectsocket;
   m_sendeventargs.completed += sendeventargs_completed;
  }

  socketclientinfo _clientinfo;

  public socketclientinfo clientinfo
  {
   get
   {
    return _clientinfo;
   }
  }

  internal void createclientinfo(netlistener netlistener)
  {
   _clientinfo = new socketclientinfo();
   try
   {
    _clientinfo.tag = netlistener._listenparam._tag;
    ipendpoint ip = connectsocket.localendpoint as ipendpoint;
    debug.assert(netlistener._listenparam._port == ip.port);

    _clientinfo.localip = ip.address.tostring();
    _clientinfo.localport = netlistener._listenparam._port;

    ip = connectsocket.remoteendpoint as ipendpoint;
    _clientinfo.peerip = ip.address.tostring();
    _clientinfo.peerport = ip.port;
   }
   catch (exception ex)
   {
    netlogger.log("createclientinfo", ex);
   }
  }
  internal void setclientinfo(socketclientinfo clientinfo)
  {
   _clientinfo = clientinfo;
  }

  #region read process
  bool _inreadpending = false;
  public en_socketreadresult readnextdata()
  {
   lock (this)
   {
    if (_socketerror)
     return en_socketreadresult.readerror;
    if (_inreadpending)
     return en_socketreadresult.inasyn;
    if(!connectsocket.connected)
    {
     onreaderror();
     return en_socketreadresult.readerror;
    }

    try
    {
     m_receiveeventargs.setbuffer(m_asyncreceivebuffer, 0, m_asyncreceivebuffer.length);
     _inreadpending = true;
     bool willraiseevent = connectsocket.receiveasync(receiveeventargs); //投递接收请求
     if (!willraiseevent)
     {
      _inreadpending = false;
      processreceive();
      if (_socketerror)
      {
       onreaderror();
       return en_socketreadresult.readerror;
      }
      return en_socketreadresult.haveread;
     }
     else
     {
      return en_socketreadresult.inasyn;
     }
    }
    catch (exception ex)
    {
     netlogger.log("readnextdata", ex);
     _inreadpending = false;
     onreaderror();
     return en_socketreadresult.readerror;
    }
   }
  }

  private void processreceive()
  {
   if (receiveeventargs.bytestransferred > 0
    && receiveeventargs.socketerror == socketerror.success)
   {
    int offset = receiveeventargs.offset;
    int count = receiveeventargs.bytestransferred;

    byte[] readdata = new byte[count];
    array.copy(m_asyncreceivebuffer, offset, readdata, 0, count);

    _inreadpending = false;
    if (!_socketerror)
     onreaddata?.invoke(this, readdata);
   }
   else
   {
    _inreadpending = false;
    onreaderror();
   }
  }

  private void receiveeventargs_completed(object sender, socketasynceventargs e)
  {
   lock (this)
   {
    _inreadpending = false;
    processreceive();
    if (_socketerror)
    {
     onreaderror();
    }
   }
  }

  bool _socketerror = false;
  private void onreaderror()
  {
   lock (this)
   {
    if (_socketerror == false)
    {
     _socketerror = true;
     onsocketclose?.invoke(this);
    }
    closeclient();
   }
  }
  #endregion

  #region send process
  int _sendbufferbytecount = 102400;
  public int sendbufferbytecount
  {
   get
   {
    return _sendbufferbytecount;
   }
   set
   {
    if (value < 1024)
    {
     _sendbufferbytecount = 1024;
    }
    else
    {
     _sendbufferbytecount = value;
    }
   }
  }

  sendbufferpool _senddatapool = new sendbufferpool();
  internal en_senddataresult putsenddata(byte[] data)
  {
   if (_socketerror)
    return en_senddataresult.no_client;

   if (_senddatapool._bufferbytecount >= _sendbufferbytecount)
   {
    return en_senddataresult.buffer_overflow;
   }

   if (data.length <= iocpreadlen)
   {
    _senddatapool.putobj(data);
   }
   else
   {
    list<byte[]> dataitems = splitdata(data, iocpreadlen);
    foreach (byte[] item in dataitems)
    {
     _senddatapool.putobj(item);
    }
   }

   return en_senddataresult.ok;
  }

  bool _insendpending = false;
  public en_socketsendresult sendnextdata()
  {
   lock (this)
   {
    if (_socketerror)
    {
     return en_socketsendresult.senderror;
    }

    if (_insendpending)
    {
     return en_socketsendresult.inasyn;
    }

    int sendbytecount = getsenddata();
    if (sendbytecount == 0)
    {
     return en_socketsendresult.nosenddata;
    }

    //防止抛出异常,否则影响性能
    if (!connectsocket.connected)
    {
     onsenderror();
     return en_socketsendresult.senderror;
    }

    try
    {
     m_sendeventargs.setbuffer(m_asyncsendbuffer, 0, sendbytecount);
     _insendpending = true;
     bool willraiseevent = connectsocket.sendasync(m_sendeventargs);
     if (!willraiseevent)
     {
      _insendpending = false;
      processsend(m_sendeventargs);
      if (_socketerror)
      {
       onsenderror();
       return en_socketsendresult.senderror;
      }
      else
      {
       onsenddata?.invoke(this, sendbytecount);
       //继续发下一条
       return en_socketsendresult.havesend;
      }
     }
     else
     {
      return en_socketsendresult.inasyn;
     }
    }
    catch (exception ex)
    {
     netlogger.log("sendnextdata", ex);
     _insendpending = false;
     onsenderror();
     return en_socketsendresult.senderror;
    }
   }
  }

  private void sendeventargs_completed(object sender, socketasynceventargs sendeventargs)
  {
   lock (this)
   {
    try
    {
     _insendpending = false;
     processsend(m_sendeventargs);

     int sendcount = 0;
     if (sendeventargs.socketerror == socketerror.success)
     {
      sendcount = sendeventargs.bytestransferred;
     }
     onsenddata?.invoke(this, sendcount);

     if (_socketerror)
     {
      onsenderror();
     }
    }
    catch (exception ex)
    {
     netlogger.log("sendeventargs_completed", ex);
    }
   }
  }

  private bool processsend(socketasynceventargs sendeventargs)
  {
   if (sendeventargs.socketerror == socketerror.success)
   {
    return true;
   }
   else
   {
    onsenderror();
    return false;
   }
  }

  private int getsenddata()
  {
   int datalen = 0;
   while (true)
   {
    byte[] data = _senddatapool.getobj();
    if (data == null)
     return datalen;
    array.copy(data, 0, m_asyncsendbuffer, datalen, data.length);
    datalen += data.length;
    if (datalen > iocpreadlen)
     break;
   }
   return datalen;
  }
  private void onsenderror()
  {
   lock (this)
   {
    if (_socketerror == false)
    {
     _socketerror = true;
     onsocketclose?.invoke(this);
    }
    closeclient();
   }
  }
  #endregion

  internal void closesocket()
  {
   try
   {
    connectsocket.close();
   }
   catch (exception ex)
   {
    netlogger.log("closesocket", ex);
   }
  }

  static object socketcloselock = new object();
  public static int closesendcount = 0;
  public static int closereadcount = 0;

  bool _disposesend = false;
  void closesend()
  {
   if (!_disposesend && !_insendpending)
   {
    lock (socketcloselock)
     closesendcount++;

    _disposesend = true;
    m_sendeventargs.setbuffer(null, 0, 0);
    m_sendeventargs.completed -= sendeventargs_completed;
    m_sendeventargs.dispose();
   }
  }

  bool _disposeread = false;
  void closeread()
  {
   if (!_disposeread && !_inreadpending)
   {
    lock (socketcloselock)
     closereadcount++;

    _disposeread = true;
    m_receiveeventargs.setbuffer(null, 0, 0);
    m_receiveeventargs.completed -= receiveeventargs_completed;
    m_receiveeventargs.dispose();
   }
  }
  private void closeclient()
  {
   try
   {
    closesend();
    closeread();
    connectsocket.close();
   }
   catch (exception ex)
   {
    netlogger.log("closeclient", ex);
   }
  }

  //发送缓冲大小
  private list<byte[]> splitdata(byte[] data, int maxlen)
  {
   list<byte[]> items = new list<byte[]>();

   int start = 0;
   while (true)
   {
    int itemlen = math.min(maxlen, data.length - start);
    if (itemlen == 0)
     break;
    byte[] item = new byte[itemlen];
    array.copy(data, start, item, 0, itemlen);
    items.add(item);

    start += itemlen;
   }
   return items;
  }
 }

 public enum en_socketreadresult
 {
  inasyn,
  haveread,
  readerror
 }

 public enum en_socketsendresult
 {
  inasyn,
  havesend,
  nosenddata,
  senderror
 }

 class sendbufferpool
 {
  objectpool<byte[]> _bufferpool = new objectpool<byte[]>();

  public int64 _bufferbytecount = 0;
  public bool putobj(byte[] obj)
  {
   if (_bufferpool.putobj(obj))
   {
    lock (this)
    {
     _bufferbytecount += obj.length;
    }
    return true;
   }
   else
   {
    return false;
   }
  }

  public byte[] getobj()
  {
   byte[] result = _bufferpool.getobj();
   if (result != null)
   {
    lock (this)
    {
     _bufferbytecount -= result.length;
    }
   }
   return result;
  }
 }
}

netserver 聚合其他类

using system;
using system.collections.generic;
using system.diagnostics;
using system.linq;
using system.net.sockets;
using system.threading;

namespace iocpcore
{
 public class netserver
 {
  public action<socketeventparam> onsocketpacketevent;

  //每个连接发送缓冲大小
  public int sendbufferbyteperclient { get; set; } = 1024 * 100;

  bool _serverstart = false;
  list<netlistener> _listlistener = new list<netlistener>();

  //负责对收到的字节流 组成完成的包
  clientpacketmanage _clientpacketmanage;

  public int64 sendbytecount { get; set; }
  public int64 readbytecount { get; set; }

  list<listenparam> _listlistenport = new list<listenparam>();
  public void addlistenport(int port, object tag)
  {
   _listlistenport.add(new listenparam(port, tag));
  }
  /// <summary>
  /// 
  /// </summary>
  /// <param name="listenfault">监听失败的端口</param>
  /// <returns></returns>
  public bool startlisten(out list<int> listenfault)
  {
   _serverstart = true;

   _clientpacketmanage = new clientpacketmanage(this);
   _clientpacketmanage.onsocketpacketevent += putclientpacket;

   _netconnectmanage.onsocketconnectevent += socketconnectevent;

   _listlistener.clear();
   thread thread1 = new thread(new threadstart(netpacketprocess));
   thread1.start();

   thread thread2 = new thread(new threadstart(netsendprocess));
   thread2.start();

   thread thread3 = new thread(new threadstart(netreadprocess));
   thread3.start();

   listenfault = new list<int>();
   foreach (listenparam param in _listlistenport)
   {
    netlistener listener = new netlistener(this);
    listener._listenparam = param;
    listener.onacceptsocket += listener_onacceptsocket;
    if (!listener.startlisten())
    {
     listenfault.add(param._port);
    }
    else
    {
     _listlistener.add(listener);
     netlogger.log(string.format("监听成功!端口:{0}", param._port));
    }
   }

   return listenfault.count == 0;
  }

  public void putclientpacket(socketeventparam param)
  {
   onsocketpacketevent?.invoke(param);
  }

  //获取包的最小长度
  int _packetminlen;
  int _packetmaxlen;
  public int packetminlen
  {
   get { return _packetminlen; }
  }
  public int packetmaxlen
  {
   get { return _packetmaxlen; }
  }

  /// <summary>
  /// 设置包的最小和最大长度
  /// 当minlen=0时,认为是接收字节流
  /// </summary>
  /// <param name="minlen"></param>
  /// <param name="maxlen"></param>
  public void setpacketparam(int minlen, int maxlen)
  {
   debug.assert(minlen >= 0);
   debug.assert(maxlen > minlen);
   _packetminlen = minlen;
   _packetmaxlen = maxlen;
  }

  //获取包的总长度
  public delegate int delegate_getpackettotallen(byte[] data, int offset);
  public delegate_getpackettotallen getpackettotallen_callback;

  objectpoolwithevent<socketeventparam> _socketeventpool = new objectpoolwithevent<socketeventparam>();
  private void netpacketprocess()
  {
   while (_serverstart)
   {
    try
    {
     dealeventpool();
    }
    catch (exception ex)
    {
     netlogger.log(string.format("dealeventpool 异常 {0}***{1}", ex.message, ex.stacktrace));
    }
    _socketeventpool.waitone(1000);
   }
  }

  dictionary<socket, asyncsocketclient> _clientgroup = new dictionary<socket, asyncsocketclient>();
  public int clientcount
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.count;
    }
   }
  }
  public list<socket> clientlist
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.keys.tolist();
    }
   }
  }

  private void dealeventpool()
  {
   while (true)
   {
    socketeventparam param = _socketeventpool.getobj();
    if (param == null)
     return;

    if (param.socketevent == en_socketevent.close)
    {
     lock (_clientgroup)
     {
      _clientgroup.remove(param.socket);
     }
    }

    if (_packetminlen == 0)//字节流处理
    {
     onsocketpacketevent?.invoke(param);
    }
    else
    {
     //组成一个完整的包 逻辑
     _clientpacketmanage.putsocketparam(param);
    }
   }
  }

  private void socketconnectevent(socketeventparam param, asyncsocketclient client)
  {
   try
   {
    if (param.socket == null || client == null) //连接失败
    {
     
    }
    else
    {
     lock (_clientgroup)
     {
      bool remove = _clientgroup.remove(client.connectsocket);
      debug.assert(!remove);
      _clientgroup.add(client.connectsocket, client);
     }

     client.onsocketclose += client_onsocketclose;
     client.onreaddata += client_onreaddata;
     client.onsenddata += client_onsenddata;

     _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));
    }
    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("socketconnectevent 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  internal void onrcvpacketlenerror(socket socket, byte[] buffer, int offset, int packetlen)
  {
   try
   {
    lock (_clientgroup)
    {
     if (!_clientgroup.containskey(socket))
     {
      debug.assert(false);
      return;
     }

     netlogger.log(string.format("报长度异常!包长:{0}", packetlen));
     asyncsocketclient client = _clientgroup[socket];
     client.closesocket();
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("onrcvpacketlenerror 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  #region listen port
  private void listener_onacceptsocket(listenparam listenpatam, asyncsocketclient client)
  {
   try
   {
    lock (_clientgroup)
    {
     bool remove = _clientgroup.remove(client.connectsocket);
     debug.assert(!remove);
     _clientgroup.add(client.connectsocket, client);
    }

    client.onsocketclose += client_onsocketclose;
    client.onreaddata += client_onreaddata;
    client.onsenddata += client_onsenddata;

    _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

    socketeventparam param = new socketeventparam(en_socketevent.accept, client.connectsocket);
    param.clientinfo = client.clientinfo;

    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("listener_onacceptsocket 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }


  objectpoolwithevent<socketeventdeal> _listsendevent = new objectpoolwithevent<socketeventdeal>();
  private void netsendprocess()
  {
   while (true)
   {
    dealsendevent();
    _listsendevent.waitone(1000);
   }
  }

  objectpoolwithevent<socketeventdeal> _listreadevent = new objectpoolwithevent<socketeventdeal>();
  private void netreadprocess()
  {
   while (true)
   {
    dealreadevent();
    _listreadevent.waitone(1000);
   }
  }

  
  private void dealsendevent()
  {
   while (true)
   {
    socketeventdeal item = _listsendevent.getobj();
    if (item == null)
     break;
    switch (item.socketevent)
    {
     case en_socketdealevent.send:
      {
       while (true)
       {
        en_socketsendresult result = item.client.sendnextdata();
        if (result == en_socketsendresult.havesend)
         continue;
        else
         break;
       }
      }
      break;
     case en_socketdealevent.read:
      {
       debug.assert(false);
      }
      break;     
    }
   }
  }

  private void dealreadevent()
  {
   while (true)
   {
    socketeventdeal item = _listreadevent.getobj();
    if (item == null)
     break;
    switch (item.socketevent)
    {
     case en_socketdealevent.read:
      {
       while (true)
       {
        en_socketreadresult result = item.client.readnextdata();
        if (result == en_socketreadresult.haveread)
         continue;
        else
         break;
       }
      }
      break;
     case en_socketdealevent.send:
      {
       debug.assert(false);
      }
      break;
    }
   }
  }

  private void client_onreaddata(asyncsocketclient client, byte[] readdata)
  {
   //读下一条
   _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

   try
   {
    socketeventparam param = new socketeventparam(en_socketevent.read, client.connectsocket);
    param.clientinfo = client.clientinfo;
    param.data = readdata;
    _socketeventpool.putobj(param);

    lock (this)
    {
     readbytecount += readdata.length;
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("client_onreaddata 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }
#endregion

  private void client_onsenddata(asyncsocketclient client, int sendcount)
  {
   //发送下一条
   _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));
   lock (this)
   {
    sendbytecount += sendcount;
   }
  }

  private void client_onsocketclose(asyncsocketclient client)
  {
   try
   {
    socketeventparam param = new socketeventparam(en_socketevent.close, client.connectsocket);
    param.clientinfo = client.clientinfo;
    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("client_onsocketclose 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  /// <summary>
  /// 放到发送缓冲
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="data"></param>
  /// <returns></returns>
  public en_senddataresult senddata(socket socket, byte[] data)
  {
   if (socket == null)
    return en_senddataresult.no_client;
   lock (_clientgroup)
   {
    if (!_clientgroup.containskey(socket))
     return en_senddataresult.no_client;
    asyncsocketclient client = _clientgroup[socket];
    en_senddataresult result = client.putsenddata(data);
    if (result == en_senddataresult.ok)
    {
     //发送下一条
     _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));     
    }
    return result;
   }
  }

  /// <summary>
  /// 设置某个连接的发送缓冲大小
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="bytecount"></param>
  /// <returns></returns>
  public bool setclientsendbuffer(socket socket, int bytecount)
  {
   lock (_clientgroup)
   {
    if (!_clientgroup.containskey(socket))
     return false;
    asyncsocketclient client = _clientgroup[socket];
    client.sendbufferbytecount = bytecount;
    return true;
   }
  }


  #region connect process
  netconnectmanage _netconnectmanage = new netconnectmanage();
  /// <summary>
  /// 异步连接一个客户端
  /// </summary>
  /// <param name="peerip"></param>
  /// <param name="peerport"></param>
  /// <param name="tag"></param>
  /// <returns></returns>
  public bool connectasyn(string peerip, int peerport, object tag)
  {
   return _netconnectmanage.connectasyn(peerip, peerport, tag);
  }

  /// <summary>
  /// 同步连接一个客户端
  /// </summary>
  /// <param name="peerip"></param>
  /// <param name="peerport"></param>
  /// <param name="tag"></param>
  /// <param name="socket"></param>
  /// <returns></returns>
  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   return _netconnectmanage.connect(peerip, peerport, tag, out socket);
  }
  #endregion
 }

 enum en_socketdealevent
 {
  read,
  send,
 }
 class socketeventdeal
 {
  public asyncsocketclient client { get; set; }
  public en_socketdealevent socketevent { get; set; }
  public socketeventdeal(asyncsocketclient client, en_socketdealevent socketevent)
  {
   client = client;
   socketevent = socketevent;
  }
 }
}

库的使用

使用起来非常简单,示例如下

using iocpcore;
using system;
using system.collections.generic;
using system.linq;
using system.net.sockets;
using system.text;
using system.threading.tasks;
using system.windows;

namespace warningclient
{
 public class socketserver
 {
  public action<socketeventparam> onsocketevent;

  public int64 sendbytecount
  {
   get
   {
    if (_netserver == null)
     return 0;
    return _netserver.sendbytecount;
   }
  }
  public int64 readbytecount
  {
   get
   {
    if (_netserver == null)
     return 0;
    return _netserver.readbytecount;
   }
  }

  netserver _netserver;
  en_packettype _packettype = en_packettype.bytestream;
  public void setpackttype(en_packettype packettype)
  {
   _packettype = packettype;
   if (_netserver == null)
    return;
   if (packettype == en_packettype.bytestream)
   {
    _netserver.setpacketparam(0, 1024);
   }
   else
   {
    _netserver.setpacketparam(9, 1024);
   }
  }

  public bool init(list<int> listenport)
  {
   netlogger.onlogevent += netlogger_onlogevent;
   _netserver = new netserver();
   setpackttype(_packettype);
   _netserver.getpackettotallen_callback += getpackettotallen;
   _netserver.onsocketpacketevent += socketpacketdeal;

   foreach (int n in listenport)
   {
    _netserver.addlistenport(n, n);
   }

   list<int> listenfault;
   bool start = _netserver.startlisten(out listenfault);
   return start;
  }

  int getpackettotallen(byte[] data, int offset)
  {
   if (mainwindow._packettype == en_packettype.znss)
    return getpacketznss(data, offset);
   else
    return getpacketanzhiyuan(data, offset);
  }

  int getpacketanzhiyuan(byte[] data, int offset)
  {
   int n = data[offset + 5] + 6;
   return n;
  }

  int getpacketznss(byte[] data, int offset)
  {
   int packetlen = (int)(data[4]) + 5;
   return packetlen;
  }


  public bool connectasyn(string peerip, int peerport, object tag)
  {
   return _netserver.connectasyn(peerip, peerport, tag);
  }

  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   return _netserver.connect(peerip, peerport, tag, out socket);
  }

  private void netlogger_onlogevent(string message)
  {
   applog.log(message);
  }

  dictionary<socket, socketeventparam> _clientgroup = new dictionary<socket, socketeventparam>();

  public int clientcount
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.count;
    }
   }
  }
  public list<socket> clientlist
  {
   get
   {
    if (_netserver != null)
     return _netserver.clientlist;
    return new list<socket>();
   }
  }
  void addclient(socketeventparam socketparam)
  {
   lock (_clientgroup)
   {
    _clientgroup.remove(socketparam.socket);
    _clientgroup.add(socketparam.socket, socketparam);
   }
  }

  void removeclient(socketeventparam socketparam)
  {
   lock (_clientgroup)
   {
    _clientgroup.remove(socketparam.socket);
   }
  }

  objectpool<socketeventparam> _readdatapool = new objectpool<socketeventparam>();

  public objectpool<socketeventparam> readdatapool
  {
   get
   {
    return _readdatapool;
   }
  }

  private void socketpacketdeal(socketeventparam socketparam)
  {
   onsocketevent?.invoke(socketparam);
   if (socketparam.socketevent == en_socketevent.read)
   {
    if (mainwindow._isshowreadpacket)
     _readdatapool.putobj(socketparam);
   }
   else if (socketparam.socketevent == en_socketevent.accept)
   {
    addclient(socketparam);
    string peerip = socketparam.clientinfo.peeripport;
    applog.log(string.format("客户端链接!本地端口:{0},对端:{1}",
     socketparam.clientinfo.localport, peerip));
   }
   else if (socketparam.socketevent == en_socketevent.connect)
   {
    string peerip = socketparam.clientinfo.peeripport;
    if (socketparam.socket != null)
    {
     addclient(socketparam);

     applog.log(string.format("连接对端成功!本地端口:{0},对端:{1}",
      socketparam.clientinfo.localport, peerip));
    }
    else
    {
     applog.log(string.format("连接对端失败!本地端口:{0},对端:{1}",
      socketparam.clientinfo.localport, peerip));
    }
   }
   else if (socketparam.socketevent == en_socketevent.close)
   {
    mainwindow.mainwnd.onsocketdisconnect(socketparam.socket);
    removeclient(socketparam);
    string peerip = socketparam.clientinfo.peeripport;
    applog.log(string.format("客户端断开!本地端口:{0},对端:{1},",
     socketparam.clientinfo.localport, peerip));
   }
  }

  public en_senddataresult senddata(socket socket, byte[] data)
  {
   if(socket == null)
   {
    messagebox.show("还没连接!");
    return en_senddataresult.no_client;
   }
   return _netserver.senddata(socket, data);
  }

  internal void sendtoall(byte[] data)
  {
   lock (_clientgroup)
   {
    foreach (socket socket in _clientgroup.keys)
    {
     senddata(socket, data);
    }
   }
  }
 }
}

以上这篇c#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

上一篇:

下一篇: