高性能TcpServer - 2.创建高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)
高性能tcpserver - 2.创建高性能socket服务器socketasynceventargs的实现(iocp)
高性能tcpserver - 3.命令通道(处理:掉包,粘包,垃圾包)
高性能tcpserver - 4.文件通道(处理:文件分包,支持断点续传)
socketasynceventargs对象管理 -- 用于checkout/checkin socketasynceventargs对象
socketargspool socketargspool = new socketargspool(max_clientcount);
this.m_eventargs = this.m_socketargspool.checkout();// 初始化对象
this.m_bufferpool.checkin(m_eventargs);// 回收对象
socketargsbufferpool对象管理 -- 用于checkout/checkin socketasynceventargs的buffer
socketargsbufferpool bufferpool = new socketargsbufferpool(max_clientcount, max_clientbuffersize);
this.m_bufferpool.checkout(this.m_eventargs);// 设置setbuffer
this.m_bufferpool.checkin(m_eventargs);// 回收对象
socketentitypool对象管理 -- 用于checkout/checkin socketentity
socketentitypool socketentitypool = new socketentitypool(max_clientcount, max_clientbuffersize);// 初始化
m_socketentity = this.m_socketentitypool.checkout();
m_socketentity.socketclient = socket;
m_bufferrecv = m_socketentity.bufferrecv; m_bufferrecv.clear();// 每个client的接收缓冲区
m_handle = m_socketentity.protocolhandle;// 每个client的处理类
m_analysis = m_socketentity.protocolanalysis;// 每个client的解析类
this.m_socketentitypool.checkin(socketentity);// 回收对象
部分代码
服务器监听和接收客户端连接
public void start(int port)
{
ipendpoint ipep = new ipendpoint(ipaddress.any, port);
this.m_listenersocket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);
this.m_listenersocket.bind(ipep);
this.m_listenersocket.listen(100);
listenforconnection(m_listenerargs);
}
void listenforconnection(socketasynceventargs args)
{
lock (this)
{
args.acceptsocket = null;
m_listenersocket.invokeasyncmethod(new socketasyncmethod(m_listenersocket.acceptasync), acceptasynccompleted, args);
}
}
void acceptasynccompleted(object sender, socketasynceventargs e)
{
if (e.socketerror == socketerror.operationaborted)
{
cloghelp.appendlog("[error] acceptasynccompleted:socketerror.operationaborted");
return; //server was stopped
}
if (e.socketerror == socketerror.success)
{
socket acceptsocket = e.acceptsocket;
if (acceptsocket != null)
{
if (connections + 1 <= max_clientcount)
{
ipendpoint clientep = (ipendpoint)acceptsocket.remoteendpoint;
sn = string.format("{0}:{1}", clientep.address.tostring(), clientep.port);
lock (lockindex)
{
connections = interlocked.increment(ref connections);
program.addmessage("已连接,sn:" + sn + ",当前连接数:" + cserverintance.connections.tostring());
}
csocketdao socketdao = new csocketdao(socketargspool, bufferpool, socketentitypool, acceptsocket, sn);
csingleton<cclientmgr>.getinstance().addonlineclient(socketdao);
}
else
{
program.addmessage("超过最大连接数:" + max_clientcount.tostring() + ",拒接连接");
}
}
}
//continue to accept!
listenforconnection(e);
}
服务器数据处理
void receiveasynccompleted(object sender, socketasynceventargs e)
{
if (!this.m_connected) return;
try
{
m_eventargs = e;
if (m_eventargs.bytestransferred == 0)
{
socketcatcherror("bytestransferred=0"); //graceful disconnect
return;
}
if (m_eventargs.socketerror != socketerror.success)
{
socketcatcherror("socketerror=" + (e.socketerror).tostring()); //not graceful disconnect
return;
}
//数据存储
recvtime = datetime.now;
m_bufferrecv.put(e);
m_analysis.bagstatus = cprotocolanalysis.ebagstatus.bagnone;
// 粘包处理
while (m_bufferrecv.hasremaining())
{
// 掉包处理
if (cprotocolanalysis.ebagstatus.baglost == m_analysis.bagstatus) break;
m_handle.process(m_bufferrecv, m_analysis, m_strsn);// 数据解析(垃圾包处理)
if (string.isnullorempty(m_struid))
{
if (!string.isnullorempty(m_analysis.uid))
{
m_struid = m_analysis.uid;
csingleton<cclientmgr>.getinstance().addclientuid(m_struid, m_strsn, this);
}
}
if (m_analysis.whethertosend)
{
string data = cprotocolbase.getprotocol(m_analysis);
sendrealtime(data);
}
}
listenfordata(e);
}
catch (exception ex)
{
cloghelp.appendlog("[error] receiveasynccompleted,errmsg:" + ex.message);
}
}