C#中一个高性能异步socket封装库的实现思路分享
前言
socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。
异步通讯实际是利用windows完成端口(iocp)来处理的,关于完成端口实现原理,大家可以参考网上文章。
我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!
异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。
我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。
纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。
在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。
为了使大家对通讯效率有初步了解,先看测试图。
主机配置情况
百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。
这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。
库的结构图
目标
即可作为服务端(监听)也可以作为客户端(主动连接)使用。
可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。
高可用性。将复杂的底层处理封装,对外接口非常友好。
高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。
实现思路
网络处理逻辑可以分为以下几个部分:
网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。
主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。
socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1k的数据。
组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。
netlistener 监听
using system; using system.net; using system.net.sockets; using system.threading; namespace iocpcore { class netlistener { private socket listensocket; public listenparam _listenparam { get; set; } public event action<listenparam, asyncsocketclient> onacceptsocket; bool start; netserver _netserver; public netlistener(netserver netserver) { _netserver = netserver; } public int _acceptasynccount = 0; public bool startlisten() { try { start = true; ipendpoint listenpoint = new ipendpoint(ipaddress.parse("0.0.0.0"), _listenparam._port); listensocket = new socket(listenpoint.addressfamily, sockettype.stream, protocoltype.tcp); listensocket.bind(listenpoint); listensocket.listen(200); thread thread1 = new thread(new threadstart(netprocess)); thread1.start(); startaccept(); return true; } catch (exception ex) { netlogger.log(string.format("**监听异常!{0}", ex.message)); return false; } } autoresetevent _acceptevent = new autoresetevent(false); private void netprocess() { while (start) { dealnewaccept(); _acceptevent.waitone(1000 * 10); } } private void dealnewaccept() { try { if(_acceptasynccount <= 10) { startaccept(); } while (true) { asyncsocketclient client = _newsocketclientlist.getobj(); if (client == null) break; dealnewaccept(client); } } catch (exception ex) { netlogger.log(string.format("dealnewaccept 异常 {0}***{1}", ex.message, ex.stacktrace)); } } private void dealnewaccept(asyncsocketclient client) { client.sendbufferbytecount = _netserver.sendbufferbyteperclient; onacceptsocket?.invoke(_listenparam, client); } private void accepteventarg_completed(object sender, socketasynceventargs accepteventargs) { try { interlocked.decrement(ref _acceptasynccount); _acceptevent.set(); accepteventargs.completed -= accepteventarg_completed; processaccept(accepteventargs); } catch (exception ex) { netlogger.log(string.format("accepteventarg_completed {0}***{1}", ex.message, ex.stacktrace)); } } public bool startaccept() { socketasynceventargs accepteventargs = new socketasynceventargs(); accepteventargs.completed += accepteventarg_completed; bool willraiseevent = listensocket.acceptasync(accepteventargs); interlocked.increment(ref _acceptasynccount); if (!willraiseevent) { interlocked.decrement(ref _acceptasynccount); _acceptevent.set(); accepteventargs.completed -= accepteventarg_completed; processaccept(accepteventargs); } return true; } objectpool<asyncsocketclient> _newsocketclientlist = new objectpool<asyncsocketclient>(); private void processaccept(socketasynceventargs accepteventargs) { try { using (accepteventargs) { if (accepteventargs.acceptsocket != null) { asyncsocketclient client = new asyncsocketclient(accepteventargs.acceptsocket); client.createclientinfo(this); _newsocketclientlist.putobj(client); _acceptevent.set(); } } } catch (exception ex) { netlogger.log(string.format("processaccept {0}***{1}", ex.message, ex.stacktrace)); } } } }
netconnectmanage连接处理
using system; using system.net; using system.net.sockets; namespace iocpcore { class netconnectmanage { public event action<socketeventparam, asyncsocketclient> onsocketconnectevent; public bool connectasyn(string peerip, int peerport, object tag) { try { socket socket = new socket(sockettype.stream, protocoltype.tcp); socketasynceventargs socketeventargs = new socketasynceventargs(); socketeventargs.remoteendpoint = new ipendpoint(ipaddress.parse(peerip), peerport); socketeventargs.completed += socketconnect_completed; socketclientinfo clientinfo = new socketclientinfo(); socketeventargs.usertoken = clientinfo; clientinfo.peerip = peerip; clientinfo.peerport = peerport; clientinfo.tag = tag; bool willraiseevent = socket.connectasync(socketeventargs); if (!willraiseevent) { processconnect(socketeventargs); socketeventargs.completed -= socketconnect_completed; socketeventargs.dispose(); } return true; } catch (exception ex) { netlogger.log("connectasyn",ex); return false; } } private void socketconnect_completed(object sender, socketasynceventargs socketeventargs) { processconnect(socketeventargs); socketeventargs.completed -= socketconnect_completed; socketeventargs.dispose(); } private void processconnect(socketasynceventargs socketeventargs) { socketclientinfo clientinfo = socketeventargs.usertoken as socketclientinfo; if (socketeventargs.socketerror == socketerror.success) { dealconnectsocket(socketeventargs.connectsocket, clientinfo); } else { socketeventparam socketparam = new socketeventparam(en_socketevent.connect, null); socketparam.clientinfo = clientinfo; onsocketconnectevent?.invoke(socketparam, null); } } void dealconnectsocket(socket socket, socketclientinfo clientinfo) { clientinfo.setclientinfo(socket); asyncsocketclient client = new asyncsocketclient(socket); client.setclientinfo(clientinfo); //触发事件 socketeventparam socketparam = new socketeventparam(en_socketevent.connect, socket); socketparam.clientinfo = clientinfo; onsocketconnectevent?.invoke(socketparam, client); } public bool connect(string peerip, int peerport, object tag, out socket socket) { socket = null; try { socket sockettmp = new socket(sockettype.stream, protocoltype.tcp); socketclientinfo clientinfo = new socketclientinfo(); clientinfo.peerip = peerip; clientinfo.peerport = peerport; clientinfo.tag = tag; endpoint remoteep = new ipendpoint(ipaddress.parse(peerip), peerport); sockettmp.connect(remoteep); if (!sockettmp.connected) return false; dealconnectsocket(sockettmp, clientinfo); socket = sockettmp; return true; } catch (exception ex) { netlogger.log(string.format("连接对方:({0}:{1})出错!", peerip, peerport), ex); return false; } } } }
asyncsocketclient socket收发处理
using system; using system.collections.generic; using system.diagnostics; using system.net; using system.net.sockets; namespace iocpcore { public class asyncsocketclient { public static int iocpreadlen = 1024; public readonly socket connectsocket; protected socketasynceventargs m_receiveeventargs; public socketasynceventargs receiveeventargs { get { return m_receiveeventargs; } set { m_receiveeventargs = value; } } protected byte[] m_asyncreceivebuffer; protected socketasynceventargs m_sendeventargs; public socketasynceventargs sendeventargs { get { return m_sendeventargs; } set { m_sendeventargs = value; } } protected byte[] m_asyncsendbuffer; public event action<asyncsocketclient, byte[]> onreaddata; public event action<asyncsocketclient, int> onsenddata; public event action<asyncsocketclient> onsocketclose; static object releaselock = new object(); public static int createcount = 0; public static int releasecount = 0; ~asyncsocketclient() { lock (releaselock) { releasecount++; } } public asyncsocketclient(socket socket) { lock (releaselock) { createcount++; } connectsocket = socket; m_receiveeventargs = new socketasynceventargs(); m_asyncreceivebuffer = new byte[iocpreadlen]; m_receiveeventargs.acceptsocket = connectsocket; m_receiveeventargs.completed += receiveeventargs_completed; m_sendeventargs = new socketasynceventargs(); m_asyncsendbuffer = new byte[iocpreadlen * 2]; m_sendeventargs.acceptsocket = connectsocket; m_sendeventargs.completed += sendeventargs_completed; } socketclientinfo _clientinfo; public socketclientinfo clientinfo { get { return _clientinfo; } } internal void createclientinfo(netlistener netlistener) { _clientinfo = new socketclientinfo(); try { _clientinfo.tag = netlistener._listenparam._tag; ipendpoint ip = connectsocket.localendpoint as ipendpoint; debug.assert(netlistener._listenparam._port == ip.port); _clientinfo.localip = ip.address.tostring(); _clientinfo.localport = netlistener._listenparam._port; ip = connectsocket.remoteendpoint as ipendpoint; _clientinfo.peerip = ip.address.tostring(); _clientinfo.peerport = ip.port; } catch (exception ex) { netlogger.log("createclientinfo", ex); } } internal void setclientinfo(socketclientinfo clientinfo) { _clientinfo = clientinfo; } #region read process bool _inreadpending = false; public en_socketreadresult readnextdata() { lock (this) { if (_socketerror) return en_socketreadresult.readerror; if (_inreadpending) return en_socketreadresult.inasyn; if(!connectsocket.connected) { onreaderror(); return en_socketreadresult.readerror; } try { m_receiveeventargs.setbuffer(m_asyncreceivebuffer, 0, m_asyncreceivebuffer.length); _inreadpending = true; bool willraiseevent = connectsocket.receiveasync(receiveeventargs); //投递接收请求 if (!willraiseevent) { _inreadpending = false; processreceive(); if (_socketerror) { onreaderror(); return en_socketreadresult.readerror; } return en_socketreadresult.haveread; } else { return en_socketreadresult.inasyn; } } catch (exception ex) { netlogger.log("readnextdata", ex); _inreadpending = false; onreaderror(); return en_socketreadresult.readerror; } } } private void processreceive() { if (receiveeventargs.bytestransferred > 0 && receiveeventargs.socketerror == socketerror.success) { int offset = receiveeventargs.offset; int count = receiveeventargs.bytestransferred; byte[] readdata = new byte[count]; array.copy(m_asyncreceivebuffer, offset, readdata, 0, count); _inreadpending = false; if (!_socketerror) onreaddata?.invoke(this, readdata); } else { _inreadpending = false; onreaderror(); } } private void receiveeventargs_completed(object sender, socketasynceventargs e) { lock (this) { _inreadpending = false; processreceive(); if (_socketerror) { onreaderror(); } } } bool _socketerror = false; private void onreaderror() { lock (this) { if (_socketerror == false) { _socketerror = true; onsocketclose?.invoke(this); } closeclient(); } } #endregion #region send process int _sendbufferbytecount = 102400; public int sendbufferbytecount { get { return _sendbufferbytecount; } set { if (value < 1024) { _sendbufferbytecount = 1024; } else { _sendbufferbytecount = value; } } } sendbufferpool _senddatapool = new sendbufferpool(); internal en_senddataresult putsenddata(byte[] data) { if (_socketerror) return en_senddataresult.no_client; if (_senddatapool._bufferbytecount >= _sendbufferbytecount) { return en_senddataresult.buffer_overflow; } if (data.length <= iocpreadlen) { _senddatapool.putobj(data); } else { list<byte[]> dataitems = splitdata(data, iocpreadlen); foreach (byte[] item in dataitems) { _senddatapool.putobj(item); } } return en_senddataresult.ok; } bool _insendpending = false; public en_socketsendresult sendnextdata() { lock (this) { if (_socketerror) { return en_socketsendresult.senderror; } if (_insendpending) { return en_socketsendresult.inasyn; } int sendbytecount = getsenddata(); if (sendbytecount == 0) { return en_socketsendresult.nosenddata; } //防止抛出异常,否则影响性能 if (!connectsocket.connected) { onsenderror(); return en_socketsendresult.senderror; } try { m_sendeventargs.setbuffer(m_asyncsendbuffer, 0, sendbytecount); _insendpending = true; bool willraiseevent = connectsocket.sendasync(m_sendeventargs); if (!willraiseevent) { _insendpending = false; processsend(m_sendeventargs); if (_socketerror) { onsenderror(); return en_socketsendresult.senderror; } else { onsenddata?.invoke(this, sendbytecount); //继续发下一条 return en_socketsendresult.havesend; } } else { return en_socketsendresult.inasyn; } } catch (exception ex) { netlogger.log("sendnextdata", ex); _insendpending = false; onsenderror(); return en_socketsendresult.senderror; } } } private void sendeventargs_completed(object sender, socketasynceventargs sendeventargs) { lock (this) { try { _insendpending = false; processsend(m_sendeventargs); int sendcount = 0; if (sendeventargs.socketerror == socketerror.success) { sendcount = sendeventargs.bytestransferred; } onsenddata?.invoke(this, sendcount); if (_socketerror) { onsenderror(); } } catch (exception ex) { netlogger.log("sendeventargs_completed", ex); } } } private bool processsend(socketasynceventargs sendeventargs) { if (sendeventargs.socketerror == socketerror.success) { return true; } else { onsenderror(); return false; } } private int getsenddata() { int datalen = 0; while (true) { byte[] data = _senddatapool.getobj(); if (data == null) return datalen; array.copy(data, 0, m_asyncsendbuffer, datalen, data.length); datalen += data.length; if (datalen > iocpreadlen) break; } return datalen; } private void onsenderror() { lock (this) { if (_socketerror == false) { _socketerror = true; onsocketclose?.invoke(this); } closeclient(); } } #endregion internal void closesocket() { try { connectsocket.close(); } catch (exception ex) { netlogger.log("closesocket", ex); } } static object socketcloselock = new object(); public static int closesendcount = 0; public static int closereadcount = 0; bool _disposesend = false; void closesend() { if (!_disposesend && !_insendpending) { lock (socketcloselock) closesendcount++; _disposesend = true; m_sendeventargs.setbuffer(null, 0, 0); m_sendeventargs.completed -= sendeventargs_completed; m_sendeventargs.dispose(); } } bool _disposeread = false; void closeread() { if (!_disposeread && !_inreadpending) { lock (socketcloselock) closereadcount++; _disposeread = true; m_receiveeventargs.setbuffer(null, 0, 0); m_receiveeventargs.completed -= receiveeventargs_completed; m_receiveeventargs.dispose(); } } private void closeclient() { try { closesend(); closeread(); connectsocket.close(); } catch (exception ex) { netlogger.log("closeclient", ex); } } //发送缓冲大小 private list<byte[]> splitdata(byte[] data, int maxlen) { list<byte[]> items = new list<byte[]>(); int start = 0; while (true) { int itemlen = math.min(maxlen, data.length - start); if (itemlen == 0) break; byte[] item = new byte[itemlen]; array.copy(data, start, item, 0, itemlen); items.add(item); start += itemlen; } return items; } } public enum en_socketreadresult { inasyn, haveread, readerror } public enum en_socketsendresult { inasyn, havesend, nosenddata, senderror } class sendbufferpool { objectpool<byte[]> _bufferpool = new objectpool<byte[]>(); public int64 _bufferbytecount = 0; public bool putobj(byte[] obj) { if (_bufferpool.putobj(obj)) { lock (this) { _bufferbytecount += obj.length; } return true; } else { return false; } } public byte[] getobj() { byte[] result = _bufferpool.getobj(); if (result != null) { lock (this) { _bufferbytecount -= result.length; } } return result; } } }
netserver 聚合其他类
using system; using system.collections.generic; using system.diagnostics; using system.linq; using system.net.sockets; using system.threading; namespace iocpcore { public class netserver { public action<socketeventparam> onsocketpacketevent; //每个连接发送缓冲大小 public int sendbufferbyteperclient { get; set; } = 1024 * 100; bool _serverstart = false; list<netlistener> _listlistener = new list<netlistener>(); //负责对收到的字节流 组成完成的包 clientpacketmanage _clientpacketmanage; public int64 sendbytecount { get; set; } public int64 readbytecount { get; set; } list<listenparam> _listlistenport = new list<listenparam>(); public void addlistenport(int port, object tag) { _listlistenport.add(new listenparam(port, tag)); } /// <summary> /// /// </summary> /// <param name="listenfault">监听失败的端口</param> /// <returns></returns> public bool startlisten(out list<int> listenfault) { _serverstart = true; _clientpacketmanage = new clientpacketmanage(this); _clientpacketmanage.onsocketpacketevent += putclientpacket; _netconnectmanage.onsocketconnectevent += socketconnectevent; _listlistener.clear(); thread thread1 = new thread(new threadstart(netpacketprocess)); thread1.start(); thread thread2 = new thread(new threadstart(netsendprocess)); thread2.start(); thread thread3 = new thread(new threadstart(netreadprocess)); thread3.start(); listenfault = new list<int>(); foreach (listenparam param in _listlistenport) { netlistener listener = new netlistener(this); listener._listenparam = param; listener.onacceptsocket += listener_onacceptsocket; if (!listener.startlisten()) { listenfault.add(param._port); } else { _listlistener.add(listener); netlogger.log(string.format("监听成功!端口:{0}", param._port)); } } return listenfault.count == 0; } public void putclientpacket(socketeventparam param) { onsocketpacketevent?.invoke(param); } //获取包的最小长度 int _packetminlen; int _packetmaxlen; public int packetminlen { get { return _packetminlen; } } public int packetmaxlen { get { return _packetmaxlen; } } /// <summary> /// 设置包的最小和最大长度 /// 当minlen=0时,认为是接收字节流 /// </summary> /// <param name="minlen"></param> /// <param name="maxlen"></param> public void setpacketparam(int minlen, int maxlen) { debug.assert(minlen >= 0); debug.assert(maxlen > minlen); _packetminlen = minlen; _packetmaxlen = maxlen; } //获取包的总长度 public delegate int delegate_getpackettotallen(byte[] data, int offset); public delegate_getpackettotallen getpackettotallen_callback; objectpoolwithevent<socketeventparam> _socketeventpool = new objectpoolwithevent<socketeventparam>(); private void netpacketprocess() { while (_serverstart) { try { dealeventpool(); } catch (exception ex) { netlogger.log(string.format("dealeventpool 异常 {0}***{1}", ex.message, ex.stacktrace)); } _socketeventpool.waitone(1000); } } dictionary<socket, asyncsocketclient> _clientgroup = new dictionary<socket, asyncsocketclient>(); public int clientcount { get { lock (_clientgroup) { return _clientgroup.count; } } } public list<socket> clientlist { get { lock (_clientgroup) { return _clientgroup.keys.tolist(); } } } private void dealeventpool() { while (true) { socketeventparam param = _socketeventpool.getobj(); if (param == null) return; if (param.socketevent == en_socketevent.close) { lock (_clientgroup) { _clientgroup.remove(param.socket); } } if (_packetminlen == 0)//字节流处理 { onsocketpacketevent?.invoke(param); } else { //组成一个完整的包 逻辑 _clientpacketmanage.putsocketparam(param); } } } private void socketconnectevent(socketeventparam param, asyncsocketclient client) { try { if (param.socket == null || client == null) //连接失败 { } else { lock (_clientgroup) { bool remove = _clientgroup.remove(client.connectsocket); debug.assert(!remove); _clientgroup.add(client.connectsocket, client); } client.onsocketclose += client_onsocketclose; client.onreaddata += client_onreaddata; client.onsenddata += client_onsenddata; _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read)); } _socketeventpool.putobj(param); } catch (exception ex) { netlogger.log(string.format("socketconnectevent 异常 {0}***{1}", ex.message, ex.stacktrace)); } } internal void onrcvpacketlenerror(socket socket, byte[] buffer, int offset, int packetlen) { try { lock (_clientgroup) { if (!_clientgroup.containskey(socket)) { debug.assert(false); return; } netlogger.log(string.format("报长度异常!包长:{0}", packetlen)); asyncsocketclient client = _clientgroup[socket]; client.closesocket(); } } catch (exception ex) { netlogger.log(string.format("onrcvpacketlenerror 异常 {0}***{1}", ex.message, ex.stacktrace)); } } #region listen port private void listener_onacceptsocket(listenparam listenpatam, asyncsocketclient client) { try { lock (_clientgroup) { bool remove = _clientgroup.remove(client.connectsocket); debug.assert(!remove); _clientgroup.add(client.connectsocket, client); } client.onsocketclose += client_onsocketclose; client.onreaddata += client_onreaddata; client.onsenddata += client_onsenddata; _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read)); socketeventparam param = new socketeventparam(en_socketevent.accept, client.connectsocket); param.clientinfo = client.clientinfo; _socketeventpool.putobj(param); } catch (exception ex) { netlogger.log(string.format("listener_onacceptsocket 异常 {0}***{1}", ex.message, ex.stacktrace)); } } objectpoolwithevent<socketeventdeal> _listsendevent = new objectpoolwithevent<socketeventdeal>(); private void netsendprocess() { while (true) { dealsendevent(); _listsendevent.waitone(1000); } } objectpoolwithevent<socketeventdeal> _listreadevent = new objectpoolwithevent<socketeventdeal>(); private void netreadprocess() { while (true) { dealreadevent(); _listreadevent.waitone(1000); } } private void dealsendevent() { while (true) { socketeventdeal item = _listsendevent.getobj(); if (item == null) break; switch (item.socketevent) { case en_socketdealevent.send: { while (true) { en_socketsendresult result = item.client.sendnextdata(); if (result == en_socketsendresult.havesend) continue; else break; } } break; case en_socketdealevent.read: { debug.assert(false); } break; } } } private void dealreadevent() { while (true) { socketeventdeal item = _listreadevent.getobj(); if (item == null) break; switch (item.socketevent) { case en_socketdealevent.read: { while (true) { en_socketreadresult result = item.client.readnextdata(); if (result == en_socketreadresult.haveread) continue; else break; } } break; case en_socketdealevent.send: { debug.assert(false); } break; } } } private void client_onreaddata(asyncsocketclient client, byte[] readdata) { //读下一条 _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read)); try { socketeventparam param = new socketeventparam(en_socketevent.read, client.connectsocket); param.clientinfo = client.clientinfo; param.data = readdata; _socketeventpool.putobj(param); lock (this) { readbytecount += readdata.length; } } catch (exception ex) { netlogger.log(string.format("client_onreaddata 异常 {0}***{1}", ex.message, ex.stacktrace)); } } #endregion private void client_onsenddata(asyncsocketclient client, int sendcount) { //发送下一条 _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send)); lock (this) { sendbytecount += sendcount; } } private void client_onsocketclose(asyncsocketclient client) { try { socketeventparam param = new socketeventparam(en_socketevent.close, client.connectsocket); param.clientinfo = client.clientinfo; _socketeventpool.putobj(param); } catch (exception ex) { netlogger.log(string.format("client_onsocketclose 异常 {0}***{1}", ex.message, ex.stacktrace)); } } /// <summary> /// 放到发送缓冲 /// </summary> /// <param name="socket"></param> /// <param name="data"></param> /// <returns></returns> public en_senddataresult senddata(socket socket, byte[] data) { if (socket == null) return en_senddataresult.no_client; lock (_clientgroup) { if (!_clientgroup.containskey(socket)) return en_senddataresult.no_client; asyncsocketclient client = _clientgroup[socket]; en_senddataresult result = client.putsenddata(data); if (result == en_senddataresult.ok) { //发送下一条 _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send)); } return result; } } /// <summary> /// 设置某个连接的发送缓冲大小 /// </summary> /// <param name="socket"></param> /// <param name="bytecount"></param> /// <returns></returns> public bool setclientsendbuffer(socket socket, int bytecount) { lock (_clientgroup) { if (!_clientgroup.containskey(socket)) return false; asyncsocketclient client = _clientgroup[socket]; client.sendbufferbytecount = bytecount; return true; } } #region connect process netconnectmanage _netconnectmanage = new netconnectmanage(); /// <summary> /// 异步连接一个客户端 /// </summary> /// <param name="peerip"></param> /// <param name="peerport"></param> /// <param name="tag"></param> /// <returns></returns> public bool connectasyn(string peerip, int peerport, object tag) { return _netconnectmanage.connectasyn(peerip, peerport, tag); } /// <summary> /// 同步连接一个客户端 /// </summary> /// <param name="peerip"></param> /// <param name="peerport"></param> /// <param name="tag"></param> /// <param name="socket"></param> /// <returns></returns> public bool connect(string peerip, int peerport, object tag, out socket socket) { return _netconnectmanage.connect(peerip, peerport, tag, out socket); } #endregion } enum en_socketdealevent { read, send, } class socketeventdeal { public asyncsocketclient client { get; set; } public en_socketdealevent socketevent { get; set; } public socketeventdeal(asyncsocketclient client, en_socketdealevent socketevent) { client = client; socketevent = socketevent; } } }
库的使用
使用起来非常简单,示例如下
using iocpcore; using system; using system.collections.generic; using system.linq; using system.net.sockets; using system.text; using system.threading.tasks; using system.windows; namespace warningclient { public class socketserver { public action<socketeventparam> onsocketevent; public int64 sendbytecount { get { if (_netserver == null) return 0; return _netserver.sendbytecount; } } public int64 readbytecount { get { if (_netserver == null) return 0; return _netserver.readbytecount; } } netserver _netserver; en_packettype _packettype = en_packettype.bytestream; public void setpackttype(en_packettype packettype) { _packettype = packettype; if (_netserver == null) return; if (packettype == en_packettype.bytestream) { _netserver.setpacketparam(0, 1024); } else { _netserver.setpacketparam(9, 1024); } } public bool init(list<int> listenport) { netlogger.onlogevent += netlogger_onlogevent; _netserver = new netserver(); setpackttype(_packettype); _netserver.getpackettotallen_callback += getpackettotallen; _netserver.onsocketpacketevent += socketpacketdeal; foreach (int n in listenport) { _netserver.addlistenport(n, n); } list<int> listenfault; bool start = _netserver.startlisten(out listenfault); return start; } int getpackettotallen(byte[] data, int offset) { if (mainwindow._packettype == en_packettype.znss) return getpacketznss(data, offset); else return getpacketanzhiyuan(data, offset); } int getpacketanzhiyuan(byte[] data, int offset) { int n = data[offset + 5] + 6; return n; } int getpacketznss(byte[] data, int offset) { int packetlen = (int)(data[4]) + 5; return packetlen; } public bool connectasyn(string peerip, int peerport, object tag) { return _netserver.connectasyn(peerip, peerport, tag); } public bool connect(string peerip, int peerport, object tag, out socket socket) { return _netserver.connect(peerip, peerport, tag, out socket); } private void netlogger_onlogevent(string message) { applog.log(message); } dictionary<socket, socketeventparam> _clientgroup = new dictionary<socket, socketeventparam>(); public int clientcount { get { lock (_clientgroup) { return _clientgroup.count; } } } public list<socket> clientlist { get { if (_netserver != null) return _netserver.clientlist; return new list<socket>(); } } void addclient(socketeventparam socketparam) { lock (_clientgroup) { _clientgroup.remove(socketparam.socket); _clientgroup.add(socketparam.socket, socketparam); } } void removeclient(socketeventparam socketparam) { lock (_clientgroup) { _clientgroup.remove(socketparam.socket); } } objectpool<socketeventparam> _readdatapool = new objectpool<socketeventparam>(); public objectpool<socketeventparam> readdatapool { get { return _readdatapool; } } private void socketpacketdeal(socketeventparam socketparam) { onsocketevent?.invoke(socketparam); if (socketparam.socketevent == en_socketevent.read) { if (mainwindow._isshowreadpacket) _readdatapool.putobj(socketparam); } else if (socketparam.socketevent == en_socketevent.accept) { addclient(socketparam); string peerip = socketparam.clientinfo.peeripport; applog.log(string.format("客户端链接!本地端口:{0},对端:{1}", socketparam.clientinfo.localport, peerip)); } else if (socketparam.socketevent == en_socketevent.connect) { string peerip = socketparam.clientinfo.peeripport; if (socketparam.socket != null) { addclient(socketparam); applog.log(string.format("连接对端成功!本地端口:{0},对端:{1}", socketparam.clientinfo.localport, peerip)); } else { applog.log(string.format("连接对端失败!本地端口:{0},对端:{1}", socketparam.clientinfo.localport, peerip)); } } else if (socketparam.socketevent == en_socketevent.close) { mainwindow.mainwnd.onsocketdisconnect(socketparam.socket); removeclient(socketparam); string peerip = socketparam.clientinfo.peeripport; applog.log(string.format("客户端断开!本地端口:{0},对端:{1},", socketparam.clientinfo.localport, peerip)); } } public en_senddataresult senddata(socket socket, byte[] data) { if(socket == null) { messagebox.show("还没连接!"); return en_senddataresult.no_client; } return _netserver.senddata(socket, data); } internal void sendtoall(byte[] data) { lock (_clientgroup) { foreach (socket socket in _clientgroup.keys) { senddata(socket, data); } } } } }
以上这篇c#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。