C#完成端口(IOCP)
程序员文章站
2022-04-15 21:29:42
Pool /// /// 与每个客户Socket相关联,进行Send和Receive投递时所需要的参数 /// public class IoContextPool { List pool; //为每一个Socke ......
pool
/// <summary> /// 与每个客户socket相关联,进行send和receive投递时所需要的参数 /// </summary> public class iocontextpool { list<socketasynceventargs> pool; //为每一个socket客户端分配一个socketasynceventargs,用一个list管理,在程序启动时建立。 int32 capacity; //pool对象池的容量 int32 boundary; //已分配和未分配对象的边界,大的是已经分配的,小的是未分配的 public iocontextpool(int32 capacity) { this.pool = new list<socketasynceventargs>(capacity); this.boundary = 0; this.capacity = capacity; } /// <summary> /// 往pool对象池中增加新建立的对象,因为这个程序在启动时会建立好所有对象, /// 故这个方法只在初始化时会被调用,因此,没有加锁。 /// </summary> /// <param name="arg"></param> /// <returns></returns> public bool add(socketasynceventargs arg) { if (arg != null && pool.count < capacity) { pool.add(arg); boundary++; return true; } else return false; } /// <summary> /// 取出集合中指定对象,内部使用 /// </summary> /// <param name="index"></param> /// <returns></returns> //internal socketasynceventargs get(int index) //{ // if (index >= 0 && index < capacity) // return pool[index]; // else // return null; //} /// <summary> /// 从对象池中取出一个对象,交给一个socket来进行投递请求操作 /// </summary> /// <returns></returns> public socketasynceventargs pop() { lock (this.pool) { if (boundary > 0) { --boundary; return pool[boundary]; } else return null; } } /// <summary> /// 一个socket客户断开,与其相关的iocontext被释放,重新投入pool中,备用。 /// </summary> /// <param name="arg"></param> /// <returns></returns> public bool push(socketasynceventargs arg) { if (arg != null) { lock (this.pool) { int index = this.pool.indexof(arg, boundary); //找出被断开的客户,此处一定能查到,因此index不可能为-1,必定要大于0。 if (index == boundary) //正好是边界元素 boundary++; else { this.pool[index] = this.pool[boundary]; //将断开客户移到边界上,边界右移 this.pool[boundary++] = arg; } } return true; } else return false; } }
server
public partial class iocpserver : form { private delegate void setrichtextboxcallback(string str); private setrichtextboxcallback setrichtextboxcallback; public iocpserver() { setrichtextboxcallback = new setrichtextboxcallback(setrichtextboxreceive); initializecomponent(); } /// <summary> /// // 监听socket,用于接受客户端的连接请求 /// </summary> socket socketlistener; /// <summary> /// // 用于服务器执行的互斥同步对象 /// </summary> private static mutex mutex = new mutex(); //完成端口上进行投递所用的iocontext对象池 //private iocontextpool iocontextpool; // /// <summary> /// 服务器上连接的客户端总数 /// </summary> private int32 numconnectedsockets; /// <summary> /// 服务器能接受的最大连接数量 /// </summary> private int32 numconnections = 8192; /// <summary> /// 用于每个i/o socket操作的缓冲区大小 /// </summary> private int32 buffersize = 4028; /// <summary> /// 端口 /// </summary> private int32 bufferport = convert.toint32(configurationmanager.appsettings["serviceport"]); //ip private string _getaddress = configurationmanager.appsettings["serviceaddress"]; //所有设备用户信息 //private list<equipment> listinfo = new list<equipment>(); /// <summary> /// 所有设备用户信息 /// </summary> private list<clientinformation> listinfo = new list<clientinformation>(); /// <summary> /// 输出实体类 /// </summary> httpdate hdate = new httpdate(); //完成端口上进行投递所用的iocontext对象池 private iocontextpool iocontextpool; datetime getdate; timespan udptime; string filetxt = application.startuppath + @"\filetxt"; string filename ; private void iocpserver_load(object sender, eventargs e) { //获取所有设备用户信息 //listinfo = adogetinfo.getequipmentuser(); getdate = datetime.now.addhours(-1); filesave(); this.numconnectedsockets = 0; this.iocontextpool = new iocontextpool(numconnections); // 为iocontextpool预分配socketasynceventargs对象 for (int32 i = 0; i < this.numconnections; i++) { socketasynceventargs iocontext = new socketasynceventargs(); iocontext.completed += new eventhandler<socketasynceventargs>(oniocompleted); iocontext.setbuffer(new byte[this.buffersize], 0, this.buffersize); // 将预分配的对象加入socketasynceventargs对象池中 this.iocontextpool.add(iocontext); } // 获得主机相关信息 ipaddress[] addresslist = dns.gethostentry(environment.machinename).addresslist; ipendpoint localendpoint = new ipendpoint(addresslist[addresslist.length - 1], bufferport); // 创建监听socket this.socketlistener = new socket(localendpoint.addressfamily, sockettype.stream, protocoltype.tcp); this.socketlistener.receivebuffersize = this.buffersize; this.socketlistener.sendbuffersize = this.buffersize; if (localendpoint.addressfamily == addressfamily.internetworkv6) { // 配置监听socket为 dual-mode (ipv4 & ipv6) // 27 is equivalent to ipv6_v6only socket option in the winsock snippet below, this.socketlistener.setsocketoption(socketoptionlevel.ipv6, (socketoptionname)27, false); this.socketlistener.bind(new ipendpoint(ipaddress.ipv6any, localendpoint.port)); } else { this.socketlistener.bind(localendpoint); } // 开始监听 this.socketlistener.listen(this.numconnections); // 在监听socket上投递一个接受请求。 this.startaccept(null); // blocks the current thread to receive incoming messages. mutex.waitone(); rtboxinformation.invoke(setrichtextboxcallback, "服务器开始监听"); } /// <summary> /// 监听socket接受处理 /// </summary> /// <param name="e">socketasynceventarg associated with the completed accept operation.</param> private void processaccept(socketasynceventargs e) { socket s = e.acceptsocket; if (s.connected) { try { socketasynceventargs iocontext = this.iocontextpool.pop(); if (iocontext != null) { // 从接受的客户端连接中取数据配置iocontext //iocontext.completed += new eventhandler<socketasynceventargs>(oniocompleted); //byte [] by=new byte[]{}; //iocontext.setbuffer(by, 0, by.length); //iocontext.usertoken = s; // 从接受的客户端连接中取数据配置iocontext iocontext.usertoken = s; interlocked.increment(ref this.numconnectedsockets); string outstr = string.format("客户 {0} 连入, 共有 {1} 个连接。", s.remoteendpoint.tostring(), this.numconnectedsockets); rtboxinformation.invoke(setrichtextboxcallback, outstr); if (!s.receiveasync(iocontext)) { this.processreceive(iocontext); } } else //已经达到最大客户连接数量,在这接受连接,发送“连接已经达到最大数”,然后断开连接 { s.send(encoding.default.getbytes("连接已经达到最大数!")); string outstr = string.format("连接已满,拒绝 {0} 的连接。", s.remoteendpoint); rtboxinformation.invoke(setrichtextboxcallback, outstr); s.close(); } } catch (socketexception ex) { socket token = e.usertoken as socket; string outstr = string.format("接收客户 {0} 数据出错, 异常信息: {1} 。", token.remoteendpoint, ex.tostring()); adoinserttemp.addservererrorlog("接收客户数据出错:[icopserver代码行号177]" + ex.message); rtboxinformation.invoke(setrichtextboxcallback, outstr); } catch (exception ex) { rtboxinformation.invoke(setrichtextboxcallback, ex.message); adoinserttemp.addservererrorlog("监听socket接受处理:[icopserver代码行号182]" + ex.message); } // 投递下一个接受请求 this.startaccept(e); } } /// <summary> /// 从客户端开始接受一个连接操作 /// </summary> /// <param name="accepteventarg">the context object to use when issuing /// the accept operation on the server's listening socket.</param> private void startaccept(socketasynceventargs accepteventarg) { if (accepteventarg == null) { accepteventarg = new socketasynceventargs(); accepteventarg.completed += new eventhandler<socketasynceventargs>(onacceptcompleted); } else { // 重用前进行对象清理 accepteventarg.acceptsocket = null; } if (!this.socketlistener.acceptasync(accepteventarg)) { this.processaccept(accepteventarg); } } /// <summary> ///接收完成时处理函数 /// </summary> /// <param name="e">与接收完成操作相关联的socketasynceventarg对象</param> private void processreceive(socketasynceventargs e) { // 检查远程主机是否关闭连接 if (e.bytestransferred > 0) { if (e.socketerror == socketerror.success) { socket s = (socket)e.usertoken; clientinformation client = hdate.addclient(listinfo, s.remoteendpoint.tostring(), e.bytestransferred, e.buffer); //判断所有需接收的数据是否已经完成 if (s.available == 0) { ipendpoint localep = s.remoteendpoint as ipendpoint; // 设置发送数据 byte[] _endread = new byte[client.transferred]; bool isclose = false; client = hdate.getclient(listinfo,client); string strtext = client.rend; //encoding.utf8.getstring(e.buffer, 0, client.transferred); requesttype requesttype = hdate.request_type(strtext);//数据类型 requestdeal requestdeal = hdate.request_deal(strtext);//命令方式 byte[] data = new byte[4028]; //初始化 if (requesttype == requesttype.typeget && requestdeal == requestdeal.getconfiguration) { rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.responsegetfromnew.replace("[getsn]", client.devicessn)))); e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } else if (requesttype == requesttype.typeget && requestdeal == requestdeal.getinfo) { rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } //是否有命令发送 else if (requesttype == requesttype.typeget && requestdeal == requestdeal.getorders) { rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); if (client.waitingname != null) { if (client.waitingname.count > 0) { data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(client.waitingname[0]))); } else { data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); } } else { data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); } e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } //返回值说明:0 命令执行成功-1 参数错误-3 存取错误 else if (requesttype == requesttype.typepost && requestdeal == requestdeal.postinfo) { hdate.removelength(client, strtext); rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } //post发送数据命令 else if (requesttype == requesttype.typepost && requestdeal == requestdeal.postatttable) { hdate.gettable(strtext, client); rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } else { if (!client.isdata) { hdate.removelength(client, strtext); rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext)); data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok))); e.setbuffer(data, e.offset, data.length); rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data))); } else { hdate.gettable(strtext, client); } } try { if (!s.sendasync(e)) //投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发socketasynceventargs.completed事件 { // 同步发送时处理发送完成事件 this.processsend(e, isclose); } if (client != null && !client.isdata) { hdate.addatt(client, listinfo); thread.sleep(6000); this.closeclientsocket(s, e); } } catch (exception ex) { adoinserttemp.addservererrorlog("接收完成时处理函数:[icopserver代码行号330]" + ex.message); } } else if (!s.receiveasync(e)) //为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发socketasynceventargs.completed事件 { // 同步接收时处理接收完成事件 this.processreceive(e); } } else { this.processerror(e); } } else { this.closeclientsocket(e); } } /// <summary> /// 发送完成时处理函数 /// </summary> /// <param name="e">与发送完成操作相关联的socketasynceventarg对象</param> private void processsend(socketasynceventargs e, bool isreceive) { try { if (e.socketerror == socketerror.success) { socket s = (socket)e.usertoken; if (s != null) { //this.closeclientsocket(s, e); //接收时根据接收的字节数收缩了缓冲区的大小,因此投递接收请求时,恢复缓冲区大小 //e.setbuffer(new byte[buffer_size], 0, buffer_size); e.setbuffer(0, buffersize); if (!s.receiveasync(e)) //投递接收请求 { // 同步接收时处理接收完成事件 this.processreceive(e); } } } else { this.processerror(e); } } catch (exception ex) { rtboxinformation.invoke(setrichtextboxcallback, ex.message); adoinserttemp.addservererrorlog("发送完成时处理函数:[icopserver代码行号390]" + ex.message); this.processerror(e); } } /// <summary> /// 当socket上的发送或接收请求被完成时,调用此函数 /// </summary> /// <param name="sender">激发事件的对象</param> /// <param name="e">与发送或接收完成操作相关联的socketasynceventarg对象</param> private void oniocompleted(object sender, socketasynceventargs e) { // determine which type of operation just completed and call the associated handler. switch (e.lastoperation) { case socketasyncoperation.receive: this.processreceive(e); break; case socketasyncoperation.send: this.processsend(e,true); break; default: throw new argumentexception("the last operation completed on the socket was not a receive or send"); } } /// <summary> /// 处理socket错误 /// </summary> /// <param name="e"></param> private void processerror(socketasynceventargs e) { try { socket s = e.usertoken as socket; ipendpoint localep = s.localendpoint as ipendpoint; this.closeclientsocket(s, e); string outstr = string.format("套接字错误 {0}, ip {1}, 操作 {2}。", (int32)e.socketerror, localep, e.lastoperation); rtboxinformation.invoke(setrichtextboxcallback, outstr); } catch (exception ex) { adoinserttemp.addservererrorlog("处理socket错误:[icopserver代码行号431]" + ex.message); } } /// <summary> /// 关闭socket连接 /// </summary> /// <param name="e">socketasynceventarg associated with the completed send/receive operation.</param> private void closeclientsocket(socketasynceventargs e) { socket s = e.usertoken as socket; this.closeclientsocket(s, e); } /// <summary> /// accept 操作完成时回调函数 /// </summary> /// <param name="sender">object who raised the event.</param> /// <param name="e">socketasynceventarg associated with the completed accept operation.</param> private void onacceptcompleted(object sender, socketasynceventargs e) { this.processaccept(e); } private void closeclientsocket(socket s, socketasynceventargs e) { try { if (s != null && this.numconnectedsockets > 0) { interlocked.decrement(ref this.numconnectedsockets); // socketasynceventarg 对象被释放,压入可重用队列。 this.iocontextpool.push(e); string outstr = string.format("客户 {0} 断开, 共有 {1} 个连接。", s.remoteendpoint.tostring(), this.numconnectedsockets); rtboxinformation.invoke(setrichtextboxcallback, outstr); try { s.shutdown(socketshutdown.send); s.disconnect(true); } catch (exception ex) { rtboxinformation.invoke(setrichtextboxcallback, ex.message); adoinserttemp.addservererrorlog("sokect关闭:[icopserver代码行号477]" + ex.message); } finally { s.close(); } } } catch (exception ex) { adoinserttemp.addservererrorlog("sokect关闭:[icopserver代码行号467]" + ex.message); } } private void setrichtextboxreceive(string str) { //show txt rtboxinformation.appendtext(str); //do right rtboxinformation.select(this.rtboxinformation.textlength, 0); //do down rtboxinformation.scrolltocaret(); //new row rtboxinformation.appendtext("\r\n"); filesave(); } private void filesave() { timespan udptime=datetime.now-getdate; if(udptime.hours>=1) { filestream fs = null; streamwriter sw = null; filename = filetxt + datetime.now.tostring("yyyymmddhh"); if(!file.exists(filename)) { directory.createdirectory(filename); } fs = new filestream(filename + @"\log_"+datetime.now.tostring("yyyymmddhhmmssfff") + ".txt", filemode.create); sw = new streamwriter(fs); sw.write(rtboxinformation.text); sw.close(); fs.close(); rtboxinformation.clear(); getdate = datetime.now; } } private void iocpserver_formclosing(object sender, formclosingeventargs e) { e.cancel = true; this.hide(); } private void notifyicon_mousedoubleclick(object sender, mouseeventargs e) { this.show(); windowstate = formwindowstate.normal; } }
更新中....
上一篇: shell脚本函数知识与实践
推荐阅读
-
C# 10分钟完成百度图片提取文字(文字识别)入门篇
-
用C#对ADO.NET数据库完成简单操作的方法
-
C# 10分钟完成百度人脸识别——入门篇
-
《C#并发编程经典实例》学习笔记—2.2 返回完成的任务
-
C#利用 HttpWebRequest 类发送post请求,出现“套接字(协议/网络地址/端口)只允许使用一次”问题
-
c# AcceptEx与完成端口(IOCP)结合的示例
-
C# 10分钟完成百度图片提取文字(文字识别)——入门篇
-
C# 循环时,操作另外一个进程直到操作完成,循环继续执行
-
C# 练习题 利用条件运算符的嵌套来完成分数等级划分
-
c#,定义一个函数过程调用,完成计算1!+2!+3!...+n!的值,n为输入的任意整数。