欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

高性能TcpServer - 2.创建高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)

程序员文章站 2022-06-22 11:02:07
高性能TcpServer - 1.网络通信协议 高性能TcpServer - 2.创建高性能Socket服务器SocketAsyncEventArgs的实现(IOCP) 高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包) 高性能TcpServer - 4.文件通道(处理:文件分包 ......

高性能tcpserver - 1.网络通信协议

高性能tcpserver - 2.创建高性能socket服务器socketasynceventargs的实现(iocp)

高性能tcpserver - 3.命令通道(处理:掉包,粘包,垃圾包)

高性能tcpserver - 4.文件通道(处理:文件分包,支持断点续传)

高性能tcpserver - 5.客户端管理

高性能tcpserver - 6.代码下载

 

socketasynceventargs对象管理 -- 用于checkout/checkin socketasynceventargs对象

socketargspool socketargspool = new socketargspool(max_clientcount);

this.m_eventargs = this.m_socketargspool.checkout();// 初始化对象

this.m_bufferpool.checkin(m_eventargs);// 回收对象

 

socketargsbufferpool对象管理 -- 用于checkout/checkin socketasynceventargs的buffer

socketargsbufferpool bufferpool = new socketargsbufferpool(max_clientcount, max_clientbuffersize);

this.m_bufferpool.checkout(this.m_eventargs);// 设置setbuffer

this.m_bufferpool.checkin(m_eventargs);// 回收对象

 

socketentitypool对象管理 -- 用于checkout/checkin socketentity

socketentitypool socketentitypool = new socketentitypool(max_clientcount, max_clientbuffersize);// 初始化

m_socketentity = this.m_socketentitypool.checkout();

m_socketentity.socketclient = socket;

m_bufferrecv = m_socketentity.bufferrecv; m_bufferrecv.clear();// 每个client的接收缓冲区

m_handle = m_socketentity.protocolhandle;// 每个client的处理类

m_analysis = m_socketentity.protocolanalysis;// 每个client的解析类

this.m_socketentitypool.checkin(socketentity);// 回收对象

 

部分代码

服务器监听和接收客户端连接

public void start(int port)

        {

            ipendpoint ipep = new ipendpoint(ipaddress.any, port);

            this.m_listenersocket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);

            this.m_listenersocket.bind(ipep);

            this.m_listenersocket.listen(100);

            listenforconnection(m_listenerargs);

        }

 

        void listenforconnection(socketasynceventargs args)

        {

            lock (this)

            {

                args.acceptsocket = null;

                m_listenersocket.invokeasyncmethod(new socketasyncmethod(m_listenersocket.acceptasync), acceptasynccompleted, args);

            }

        }

 

        void acceptasynccompleted(object sender, socketasynceventargs e)

        {

            if (e.socketerror == socketerror.operationaborted)

            {

                cloghelp.appendlog("[error] acceptasynccompleted:socketerror.operationaborted");

                return; //server was stopped

            }

 

            if (e.socketerror == socketerror.success)

            {

                socket acceptsocket = e.acceptsocket;

                if (acceptsocket != null)

                {

                    if (connections + 1 <= max_clientcount)

                    {

                        ipendpoint clientep = (ipendpoint)acceptsocket.remoteendpoint;

                        sn = string.format("{0}:{1}", clientep.address.tostring(), clientep.port);

                        lock (lockindex)

                        {

                            connections = interlocked.increment(ref connections);

                            program.addmessage("已连接,sn:" + sn + ",当前连接数:" + cserverintance.connections.tostring());

                        }

                        csocketdao socketdao = new csocketdao(socketargspool, bufferpool, socketentitypool, acceptsocket, sn);

                        csingleton<cclientmgr>.getinstance().addonlineclient(socketdao);

                    }

                    else

                    {

                        program.addmessage("超过最大连接数:" + max_clientcount.tostring() + ",拒接连接");

                    }

                }

            }

            

            //continue to accept!

            listenforconnection(e);

        }

服务器数据处理

 void receiveasynccompleted(object sender, socketasynceventargs e)

        {

            if (!this.m_connected) return;

 

            try

            {

                m_eventargs = e;

 

                if (m_eventargs.bytestransferred == 0)

                {

                    socketcatcherror("bytestransferred=0"); //graceful disconnect

                    return;

                }

                if (m_eventargs.socketerror != socketerror.success)

                {

                    socketcatcherror("socketerror=" + (e.socketerror).tostring()); //not graceful disconnect

                    return;

                }

 

                //数据存储

                recvtime = datetime.now;

                m_bufferrecv.put(e);

                m_analysis.bagstatus = cprotocolanalysis.ebagstatus.bagnone;

 

                // 粘包处理

                while (m_bufferrecv.hasremaining())

                {

                    // 掉包处理

                    if (cprotocolanalysis.ebagstatus.baglost == m_analysis.bagstatus) break;

 

                    m_handle.process(m_bufferrecv, m_analysis, m_strsn);// 数据解析(垃圾包处理)

 

                    if (string.isnullorempty(m_struid))

                    {

                        if (!string.isnullorempty(m_analysis.uid))

                        {

                            m_struid = m_analysis.uid;

                            csingleton<cclientmgr>.getinstance().addclientuid(m_struid, m_strsn, this);

                        }

                    }

 

                    if (m_analysis.whethertosend)

                    {

                        string data = cprotocolbase.getprotocol(m_analysis);

                        sendrealtime(data);

                    }

                }

   

                listenfordata(e);

            }

            catch (exception ex)

            {

                cloghelp.appendlog("[error] receiveasynccompleted,errmsg:" + ex.message);

            }

        }