高性能TcpServer - 3.命令通道(处理:掉包,粘包,垃圾包)
高性能tcpserver - 2.创建高性能socket服务器socketasynceventargs的实现(iocp)
高性能tcpserver - 3.命令通道(处理:掉包,粘包,垃圾包)
高性能tcpserver - 4.文件通道(处理:文件分包,支持断点续传)
处理原理:
每个client创建各自的byte[]数组,通过遍历每个字节的数据
1.判断包长,确定掉包;
2.判断解析完后byte数组是否还有未解析的数据,确定粘包;
3.判断包头,确定垃圾包;
缓存数据类
/// <summary>
/// 缓存数据类
/// </summary>
public class cbytebuffer
{
// 默认1k
int m_ibuffersize = 1024 * 1;
// 数据解析
byte[] m_abybuf;
int m_iposition = 0;
int m_irecvlength = 0;
bool bwaitrecvremain;// 数据未接收完等待接收
object m_lock = new object(); // 内部同步锁
public int position
{
get { return m_iposition; }
set { m_iposition = value; }
}
public int recvlength
{
get { return m_irecvlength; }
set { m_irecvlength = value; }
}
public bool waitrecvremain
{
get { return bwaitrecvremain; }
set { bwaitrecvremain = value; }
}
public cbytebuffer(int buffsize)
{
m_ibuffersize = buffsize;
m_abybuf = new byte[m_ibuffersize];
}
public int getposition()
{
return m_iposition;
}
public int getrecvlength()
{
return m_irecvlength;
}
public void put(socketasynceventargs e)
{
int ilength = e.bytestransferred;
if (m_irecvlength + ilength >= m_ibuffersize)
{
clear();
return;
}
lock (m_lock)
{
array.copy(e.buffer, e.offset, m_abybuf, m_irecvlength, ilength);
m_irecvlength += ilength;
}
}
public byte getbyte()
{
bwaitrecvremain = false;
if (m_iposition >= m_irecvlength)
{
bwaitrecvremain = true;
return 0;
}
byte byret;
lock (m_lock)
{
byret = m_abybuf[m_iposition];
}
m_iposition++;
return byret;
}
public byte[] getbytearray(int length)
{
bwaitrecvremain = false;
if (m_iposition + length > m_irecvlength)
{
bwaitrecvremain = true;
return null;
}
byte[] ret = new byte[length];
lock (m_lock)
{
array.copy(m_abybuf, m_iposition, ret, 0, length);
m_iposition += length;
}
return ret;
}
public bool hasremaining()
{
return m_iposition < m_irecvlength;
}
public int remaining()
{
return m_irecvlength - m_iposition;
}
public void clear()
{
m_iposition = 0;
m_irecvlength = 0;
bwaitrecvremain = false;
}
~cbytebuffer()
{
m_abybuf = null;
dispose(false);
}
protected virtual void dispose(bool disposing)
{
if (disposing)
{
gc.suppressfinalize(this);
}
}
public void dispose()
{
dispose(true);
}
}
协议解析类
public void process(cbytebuffer bbuffer, cprotocolanalysis analysis, string sn)
{
analysis.bagstatus = cprotocolanalysis.ebagstatus.bagnone;
analysis.whethertosend = false;
int iposition = bbuffer.position;
byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0; bool headok = false;
if (!bbuffer.hasremaining()) return;
while (bbuffer.hasremaining())
{
head1 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
if (head1 == head1)
{
iposition = bbuffer.position - 1;
head2 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
head3 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
head4 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
head5 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
head6 = bbuffer.getbyte(); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
if (head2 == head2 && head3 == head3 && head4 == head4 && head5 == head5 && head6 == head6)
{
headok = true;
break;
}
else
{
cloghelp.appendlog("error,unable to parse the data2:position=" + iposition.tostring() + ",index=" + (bbuffer.getposition()).tostring() + ",head2=" + head2.tostring());
}
}
else
{
cloghelp.appendlog("error,unable to parse the data1:position=" + iposition.tostring() + ",index=" + (bbuffer.getposition()).tostring() + ",head1=" + head1.tostring());
}
}
if (!bbuffer.hasremaining())
{
if (headok)
{
if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
}
return;
}
byte[] arrlen = bbuffer.getbytearray(4); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
int len = ccommonfunc.string2int(ccommonfunc.bytetostring(arrlen)); if (-1 == len) return;
byte[] source = bbuffer.getbytearray(len); if (!analysis.isremaindata(iposition, bbuffer, analysis)) return;
if (!bbuffer.hasremaining())
{
bbuffer.clear();
}
else
{
analysis.bagstatus = cprotocolanalysis.ebagstatus.bagstick;
}
// #watermeter-001#01##
string data = ccommonfunc.bytetostring(source);
if (null == data || 0 == data.length || data.length - 1 != data.lastindexof(split1))
{
return;
}
data = data.substring(1, data.length - 2);
string[] item = data.split(split1);
if (null == item || 4 != item.length)
{
return;
}
string uid = item[0];
string taskid = item[1];
int cmd = ccommonfunc.string2int(item[2]);
string content = item[3];
program.addmessage("r: [" + sn + "] cmd=" + cmd.tostring() + " data=" + data);
analysis.cmd = cmd;
analysis.uid = uid;
analysis.taskid = taskid;
if (cmd == 1 || cmd == 2 || cmd == 3 || cmd == 4 || cmd == 5 || cmd == 6 || cmd == 7)
{
analysis.whethertosend = true;
}
string softtype = "";
try
{
switch (cmd)
{
case 1:
analysis.msg = "ok";
break;
case 2:
analysis.msg = datetime.now.tostring("yyyy-mm-dd hh:mm:ss");
break;
case 3:
// htemp=0263#watermeter-001#1520557004#03#buildid=44@edmid=37@meterid=1228@senddate=2018-02-05 17:36:22@[{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}]#
analysis.msg = "ok";
break;
case 4:
{
// 获取版本信息
softtype = content.split(split2)[1];
storagefile(softtype, system.windows.forms.application.startuppath + "\\test.zip");
analysis.msg = "2";// version
}
break;
case 5:
// 获取包数
{
softtype = content.split(split2)[1];
if (!dicsoft.containskey(softtype))
{
storagefile(softtype, system.windows.forms.application.startuppath + "\\test.zip");
}
// 获取包数
int count = 0;
filecut entity = null;
dicsoft.trygetvalue(softtype, out entity);
if (null != entity) count = entity.count;
analysis.msg = count.tostring();
}
break;
case 6:
// 执行更新动作
{
string[] items = content.split(split2);
softtype = items[1];
int downindex = ccommonfunc.string2int(items[2]);
if (!dicsoft.containskey(softtype))
{
analysis.msg = "error@" + softtype + " 未找到更新文件,请先获取包数";
}
else
{
filecut entity = null;
dicsoft.trygetvalue(softtype, out entity);
if (null != entity)
{
string filedata = "";
entity.data.trygetvalue(downindex, out filedata);
if (string.isnullorempty(filedata))
analysis.msg = "error@" + softtype + " 第" + downindex + "包的数据为空";
else
analysis.msg = filedata;
}
}
}
break;
case 7:
// 更新版本信息(update sql)
analysis.msg = "ok";
break;
}
}
catch (exception ex)
{
analysis.msg = "error@" + ex.message;
}
program.addmessage("s: [" + sn + "] cmd=" + cmd.tostring() + " data=" + analysis.msg);
}
测试效果
正常包
htemp=0026#meter-001#1533022506#01##
掉包(分两包发送)
htemp=0026#
meter-001#1533022506#01##
粘包(两包一起发送)
htemp=0026#meter-001#1533022506#01##htemp=0026#meter-001#1533022506#01##
上一篇: Android硬件编解码与软件编解码
下一篇: 深入理解TCP/IP应用层