在C#中对TCP客户端的状态封装详解
程序员文章站
2023-12-13 11:14:46
tcp客户端连接tcp服务器端有几种应用状态:1.与服务器的连接已建立2.与服务器的连接已断开3.与服务器的连接发生异常应用程序可按需求合理处理这些逻辑,比如:1.连接断开...
tcp客户端连接tcp服务器端有几种应用状态:
1.与服务器的连接已建立
2.与服务器的连接已断开
3.与服务器的连接发生异常
应用程序可按需求合理处理这些逻辑,比如:
1.连接断开后自动重连
2.连接断开后选择备用地址重连
3.所有状态变化上报告警
本文描述的tcpclient实现了状态变化的事件通知机制。
复制代码 代码如下:
/// <summary>
/// 异步tcp客户端
/// </summary>
public class asynctcpclient : idisposable
{
#region fields
private tcpclient tcpclient;
private bool disposed = false;
private int retries = 0;
#endregion
#region ctors
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteep">远端服务器终结点</param>
public asynctcpclient(ipendpoint remoteep)
: this(new[] { remoteep.address }, remoteep.port)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteep">远端服务器终结点</param>
/// <param name="localep">本地客户端终结点</param>
public asynctcpclient(ipendpoint remoteep, ipendpoint localep)
: this(new[] { remoteep.address }, remoteep.port, localep)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteipaddress">远端服务器ip地址</param>
/// <param name="remoteport">远端服务器端口</param>
public asynctcpclient(ipaddress remoteipaddress, int remoteport)
: this(new[] { remoteipaddress }, remoteport)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteipaddress">远端服务器ip地址</param>
/// <param name="remoteport">远端服务器端口</param>
/// <param name="localep">本地客户端终结点</param>
public asynctcpclient(
ipaddress remoteipaddress, int remoteport, ipendpoint localep)
: this(new[] { remoteipaddress }, remoteport, localep)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remotehostname">远端服务器主机名</param>
/// <param name="remoteport">远端服务器端口</param>
public asynctcpclient(string remotehostname, int remoteport)
: this(dns.gethostaddresses(remotehostname), remoteport)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remotehostname">远端服务器主机名</param>
/// <param name="remoteport">远端服务器端口</param>
/// <param name="localep">本地客户端终结点</param>
public asynctcpclient(
string remotehostname, int remoteport, ipendpoint localep)
: this(dns.gethostaddresses(remotehostname), remoteport, localep)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteipaddresses">远端服务器ip地址列表</param>
/// <param name="remoteport">远端服务器端口</param>
public asynctcpclient(ipaddress[] remoteipaddresses, int remoteport)
: this(remoteipaddresses, remoteport, null)
{
}
/// <summary>
/// 异步tcp客户端
/// </summary>
/// <param name="remoteipaddresses">远端服务器ip地址列表</param>
/// <param name="remoteport">远端服务器端口</param>
/// <param name="localep">本地客户端终结点</param>
public asynctcpclient(
ipaddress[] remoteipaddresses, int remoteport, ipendpoint localep)
{
this.addresses = remoteipaddresses;
this.port = remoteport;
this.localipendpoint = localep;
this.encoding = encoding.default;
if (this.localipendpoint != null)
{
this.tcpclient = new tcpclient(this.localipendpoint);
}
else
{
this.tcpclient = new tcpclient();
}
retries = 3;
retryinterval = 5;
}
#endregion
#region properties
/// <summary>
/// 是否已与服务器建立连接
/// </summary>
public bool connected { get { return tcpclient.client.connected; } }
/// <summary>
/// 远端服务器的ip地址列表
/// </summary>
public ipaddress[] addresses { get; private set; }
/// <summary>
/// 远端服务器的端口
/// </summary>
public int port { get; private set; }
/// <summary>
/// 连接重试次数
/// </summary>
public int retries { get; set; }
/// <summary>
/// 连接重试间隔
/// </summary>
public int retryinterval { get; set; }
/// <summary>
/// 远端服务器终结点
/// </summary>
public ipendpoint remoteipendpoint
{
get { return new ipendpoint(addresses[0], port); }
}
/// <summary>
/// 本地客户端终结点
/// </summary>
protected ipendpoint localipendpoint { get; private set; }
/// <summary>
/// 通信所使用的编码
/// </summary>
public encoding encoding { get; set; }
#endregion
#region connect
/// <summary>
/// 连接到服务器
/// </summary>
/// <returns>异步tcp客户端</returns>
public asynctcpclient connect()
{
if (!connected)
{
// start the async connect operation
tcpclient.beginconnect(
addresses, port, handletcpserverconnected, tcpclient);
}
return this;
}
/// <summary>
/// 关闭与服务器的连接
/// </summary>
/// <returns>异步tcp客户端</returns>
public asynctcpclient close()
{
if (connected)
{
retries = 0;
tcpclient.close();
raiseserverdisconnected(addresses, port);
}
return this;
}
#endregion
#region receive
private void handletcpserverconnected(iasyncresult ar)
{
try
{
tcpclient.endconnect(ar);
raiseserverconnected(addresses, port);
retries = 0;
}
catch (exception ex)
{
exceptionhandler.handle(ex);
if (retries > 0)
{
logger.debug(string.format(cultureinfo.invariantculture,
"connect to server with retry {0} failed.", retries));
}
retries++;
if (retries > retries)
{
// we have failed to connect to all the ip addresses,
// connection has failed overall.
raiseserverexceptionoccurred(addresses, port, ex);
return;
}
else
{
logger.debug(string.format(cultureinfo.invariantculture,
"waiting {0} seconds before retrying to connect to server.",
retryinterval));
thread.sleep(timespan.fromseconds(retryinterval));
connect();
return;
}
}
// we are connected successfully and start asyn read operation.
byte[] buffer = new byte[tcpclient.receivebuffersize];
tcpclient.getstream().beginread(
buffer, 0, buffer.length, handledatagramreceived, buffer);
}
private void handledatagramreceived(iasyncresult ar)
{
networkstream stream = tcpclient.getstream();
int numberofreadbytes = 0;
try
{
numberofreadbytes = stream.endread(ar);
}
catch
{
numberofreadbytes = 0;
}
if (numberofreadbytes == 0)
{
// connection has been closed
close();
return;
}
// received byte and trigger event notification
byte[] buffer = (byte[])ar.asyncstate;
byte[] receivedbytes = new byte[numberofreadbytes];
buffer.blockcopy(buffer, 0, receivedbytes, 0, numberofreadbytes);
raisedatagramreceived(tcpclient, receivedbytes);
raiseplaintextreceived(tcpclient, receivedbytes);
// then start reading from the network again
stream.beginread(
buffer, 0, buffer.length, handledatagramreceived, buffer);
}
#endregion
#region events
/// <summary>
/// 接收到数据报文事件
/// </summary>
public event eventhandler<tcpdatagramreceivedeventargs<byte[]>> datagramreceived;
/// <summary>
/// 接收到数据报文明文事件
/// </summary>
public event eventhandler<tcpdatagramreceivedeventargs<string>> plaintextreceived;
private void raisedatagramreceived(tcpclient sender, byte[] datagram)
{
if (datagramreceived != null)
{
datagramreceived(this,
new tcpdatagramreceivedeventargs<byte[]>(sender, datagram));
}
}
private void raiseplaintextreceived(tcpclient sender, byte[] datagram)
{
if (plaintextreceived != null)
{
plaintextreceived(this,
new tcpdatagramreceivedeventargs<string>(
sender, this.encoding.getstring(datagram, 0, datagram.length)));
}
}
/// <summary>
/// 与服务器的连接已建立事件
/// </summary>
public event eventhandler<tcpserverconnectedeventargs> serverconnected;
/// <summary>
/// 与服务器的连接已断开事件
/// </summary>
public event eventhandler<tcpserverdisconnectedeventargs> serverdisconnected;
/// <summary>
/// 与服务器的连接发生异常事件
/// </summary>
public event eventhandler<tcpserverexceptionoccurredeventargs> serverexceptionoccurred;
private void raiseserverconnected(ipaddress[] ipaddresses, int port)
{
if (serverconnected != null)
{
serverconnected(this,
new tcpserverconnectedeventargs(ipaddresses, port));
}
}
private void raiseserverdisconnected(ipaddress[] ipaddresses, int port)
{
if (serverdisconnected != null)
{
serverdisconnected(this,
new tcpserverdisconnectedeventargs(ipaddresses, port));
}
}
private void raiseserverexceptionoccurred(
ipaddress[] ipaddresses, int port, exception innerexception)
{
if (serverexceptionoccurred != null)
{
serverexceptionoccurred(this,
new tcpserverexceptionoccurredeventargs(
ipaddresses, port, innerexception));
}
}
#endregion
#region send
/// <summary>
/// 发送报文
/// </summary>
/// <param name="datagram">报文</param>
public void send(byte[] datagram)
{
if (datagram == null)
throw new argumentnullexception("datagram");
if (!connected)
{
raiseserverdisconnected(addresses, port);
throw new invalidprogramexception(
"this client has not connected to server.");
}
tcpclient.getstream().beginwrite(
datagram, 0, datagram.length, handledatagramwritten, tcpclient);
}
private void handledatagramwritten(iasyncresult ar)
{
((tcpclient)ar.asyncstate).getstream().endwrite(ar);
}
/// <summary>
/// 发送报文
/// </summary>
/// <param name="datagram">报文</param>
public void send(string datagram)
{
send(this.encoding.getbytes(datagram));
}
#endregion
#region idisposable members
/// <summary>
/// performs application-defined tasks associated with freeing,
/// releasing, or resetting unmanaged resources.
/// </summary>
public void dispose()
{
dispose(true);
gc.suppressfinalize(this);
}
/// <summary>
/// releases unmanaged and - optionally - managed resources
/// </summary>
/// <param name="disposing"><c>true</c> to release both managed
/// and unmanaged resources; <c>false</c>
/// to release only unmanaged resources.
/// </param>
protected virtual void dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
try
{
close();
if (tcpclient != null)
{
tcpclient = null;
}
}
catch (socketexception ex)
{
exceptionhandler.handle(ex);
}
}
disposed = true;
}
}
#endregion
}
使用举例
复制代码 代码如下:
class program
{
static asynctcpclient client;
static void main(string[] args)
{
logfactory.assign(new consolelogfactory());
// 测试用,可以不指定由系统选择端口
ipendpoint remoteep = new ipendpoint(ipaddress.parse("127.0.0.1"), 9999);
ipendpoint localep = new ipendpoint(ipaddress.parse("127.0.0.1"), 9998);
client = new asynctcpclient(remoteep, localep);
client.encoding = encoding.utf8;
client.serverexceptionoccurred +=
new eventhandler<tcpserverexceptionoccurredeventargs>(client_serverexceptionoccurred);
client.serverconnected +=
new eventhandler<tcpserverconnectedeventargs>(client_serverconnected);
client.serverdisconnected +=
new eventhandler<tcpserverdisconnectedeventargs>(client_serverdisconnected);
client.plaintextreceived +=
new eventhandler<tcpdatagramreceivedeventargs<string>>(client_plaintextreceived);
client.connect();
console.writeline("tcp client has connected to server.");
console.writeline("type something to send to server...");
while (true)
{
try
{
string text = console.readline();
client.send(text);
}
catch (exception ex)
{
console.writeline(ex.message);
}
}
}
static void client_serverexceptionoccurred(
object sender, tcpserverexceptionoccurredeventargs e)
{
logger.debug(string.format(cultureinfo.invariantculture,
"tcp server {0} exception occurred, {1}.",
e.tostring(), e.exception.message));
}
static void client_serverconnected(
object sender, tcpserverconnectedeventargs e)
{
logger.debug(string.format(cultureinfo.invariantculture,
"tcp server {0} has connected.", e.tostring()));
}
static void client_serverdisconnected(
object sender, tcpserverdisconnectedeventargs e)
{
logger.debug(string.format(cultureinfo.invariantculture,
"tcp server {0} has disconnected.", e.tostring()));
}
static void client_plaintextreceived(
object sender, tcpdatagramreceivedeventargs<string> e)
{
console.write(string.format("server : {0} --> ",
e.tcpclient.client.remoteendpoint.tostring()));
console.writeline(string.format("{0}", e.datagram));
}
}
补充代码
复制代码 代码如下:
/// <summary>
/// internal class to join the tcp client and buffer together
/// for easy management in the server
/// </summary>
internal class tcpclientstate
{
/// <summary>
/// constructor for a new client
/// </summary>
/// <param name="tcpclient">the tcp client</param>
/// <param name="buffer">the byte array buffer</param>
public tcpclientstate(tcpclient tcpclient, byte[] buffer)
{
if (tcpclient == null)
throw new argumentnullexception("tcpclient");
if (buffer == null)
throw new argumentnullexception("buffer");
this.tcpclient = tcpclient;
this.buffer = buffer;
}
/// <summary>
/// gets the tcp client
/// </summary>
public tcpclient tcpclient { get; private set; }
/// <summary>
/// gets the buffer.
/// </summary>
public byte[] buffer { get; private set; }
/// <summary>
/// gets the network stream
/// </summary>
public networkstream networkstream
{
get { return tcpclient.getstream(); }
}
}
复制代码 代码如下:
/// <summary>
/// 与客户端的连接已建立事件参数
/// </summary>
public class tcpclientconnectedeventargs : eventargs
{
/// <summary>
/// 与客户端的连接已建立事件参数
/// </summary>
/// <param name="tcpclient">客户端</param>
public tcpclientconnectedeventargs(tcpclient tcpclient)
{
if (tcpclient == null)
throw new argumentnullexception("tcpclient");
this.tcpclient = tcpclient;
}
/// <summary>
/// 客户端
/// </summary>
public tcpclient tcpclient { get; private set; }
}
复制代码 代码如下:
/// <summary>
/// 与客户端的连接已断开事件参数
/// </summary>
public class tcpclientdisconnectedeventargs : eventargs
{
/// <summary>
/// 与客户端的连接已断开事件参数
/// </summary>
/// <param name="tcpclient">客户端</param>
public tcpclientdisconnectedeventargs(tcpclient tcpclient)
{
if (tcpclient == null)
throw new argumentnullexception("tcpclient");
this.tcpclient = tcpclient;
}
/// <summary>
/// 客户端
/// </summary>
public tcpclient tcpclient { get; private set; }
}
复制代码 代码如下:
/// <summary>
/// 与服务器的连接发生异常事件参数
/// </summary>
public class tcpserverexceptionoccurredeventargs : eventargs
{
/// <summary>
/// 与服务器的连接发生异常事件参数
/// </summary>
/// <param name="ipaddresses">服务器ip地址列表</param>
/// <param name="port">服务器端口</param>
/// <param name="innerexception">内部异常</param>
public tcpserverexceptionoccurredeventargs(
ipaddress[] ipaddresses, int port, exception innerexception)
{
if (ipaddresses == null)
throw new argumentnullexception("ipaddresses");
this.addresses = ipaddresses;
this.port = port;
this.exception = innerexception;
}
/// <summary>
/// 服务器ip地址列表
/// </summary>
public ipaddress[] addresses { get; private set; }
/// <summary>
/// 服务器端口
/// </summary>
public int port { get; private set; }
/// <summary>
/// 内部异常
/// </summary>
public exception exception { get; private set; }
/// <summary>
/// returns a <see cref="system.string"/> that represents this instance.
/// </summary>
/// <returns>
/// a <see cref="system.string"/> that represents this instance.
/// </returns>
public override string tostring()
{
string s = string.empty;
foreach (var item in addresses)
{
s = s + item.tostring() + ',';
}
s = s.trimend(',');
s = s + ":" + port.tostring(cultureinfo.invariantculture);
return s;
}
}
推荐阅读