欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

在C#中对TCP客户端的状态封装详解

程序员文章站 2023-12-13 11:14:46
tcp客户端连接tcp服务器端有几种应用状态:1.与服务器的连接已建立2.与服务器的连接已断开3.与服务器的连接发生异常应用程序可按需求合理处理这些逻辑,比如:1.连接断开...

tcp客户端连接tcp服务器端有几种应用状态:
1.与服务器的连接已建立
2.与服务器的连接已断开
3.与服务器的连接发生异常

应用程序可按需求合理处理这些逻辑,比如:
1.连接断开后自动重连
2.连接断开后选择备用地址重连
3.所有状态变化上报告警
本文描述的tcpclient实现了状态变化的事件通知机制。

复制代码 代码如下:

/// <summary>
   /// 异步tcp客户端
   /// </summary>
   public class asynctcpclient : idisposable
   {
     #region fields

     private tcpclient tcpclient;
     private bool disposed = false;
     private int retries = 0;

     #endregion

     #region ctors

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteep">远端服务器终结点</param>
     public asynctcpclient(ipendpoint remoteep)
       : this(new[] { remoteep.address }, remoteep.port)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteep">远端服务器终结点</param>
     /// <param name="localep">本地客户端终结点</param>
     public asynctcpclient(ipendpoint remoteep, ipendpoint localep)
       : this(new[] { remoteep.address }, remoteep.port, localep)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteipaddress">远端服务器ip地址</param>
     /// <param name="remoteport">远端服务器端口</param>
     public asynctcpclient(ipaddress remoteipaddress, int remoteport)
       : this(new[] { remoteipaddress }, remoteport)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteipaddress">远端服务器ip地址</param>
     /// <param name="remoteport">远端服务器端口</param>
     /// <param name="localep">本地客户端终结点</param>
     public asynctcpclient(
       ipaddress remoteipaddress, int remoteport, ipendpoint localep)
       : this(new[] { remoteipaddress }, remoteport, localep)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remotehostname">远端服务器主机名</param>
     /// <param name="remoteport">远端服务器端口</param>
     public asynctcpclient(string remotehostname, int remoteport)
       : this(dns.gethostaddresses(remotehostname), remoteport)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remotehostname">远端服务器主机名</param>
     /// <param name="remoteport">远端服务器端口</param>
     /// <param name="localep">本地客户端终结点</param>
     public asynctcpclient(
       string remotehostname, int remoteport, ipendpoint localep)
       : this(dns.gethostaddresses(remotehostname), remoteport, localep)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteipaddresses">远端服务器ip地址列表</param>
     /// <param name="remoteport">远端服务器端口</param>
     public asynctcpclient(ipaddress[] remoteipaddresses, int remoteport)
       : this(remoteipaddresses, remoteport, null)
     {
     }

     /// <summary>
     /// 异步tcp客户端
     /// </summary>
     /// <param name="remoteipaddresses">远端服务器ip地址列表</param>
     /// <param name="remoteport">远端服务器端口</param>
     /// <param name="localep">本地客户端终结点</param>
     public asynctcpclient(
       ipaddress[] remoteipaddresses, int remoteport, ipendpoint localep)
     {
       this.addresses = remoteipaddresses;
       this.port = remoteport;
       this.localipendpoint = localep;
       this.encoding = encoding.default;

       if (this.localipendpoint != null)
       {
         this.tcpclient = new tcpclient(this.localipendpoint);
       }
       else
       {
         this.tcpclient = new tcpclient();
       }

       retries = 3;
       retryinterval = 5;
     }

     #endregion

     #region properties

     /// <summary>
     /// 是否已与服务器建立连接
     /// </summary>
     public bool connected { get { return tcpclient.client.connected; } }
     /// <summary>
     /// 远端服务器的ip地址列表
     /// </summary>
     public ipaddress[] addresses { get; private set; }
     /// <summary>
     /// 远端服务器的端口
     /// </summary>
     public int port { get; private set; }
     /// <summary>
     /// 连接重试次数
     /// </summary>
     public int retries { get; set; }
     /// <summary>
     /// 连接重试间隔
     /// </summary>
     public int retryinterval { get; set; }
     /// <summary>
     /// 远端服务器终结点
     /// </summary>
     public ipendpoint remoteipendpoint
     {
       get { return new ipendpoint(addresses[0], port); }
     }
     /// <summary>
     /// 本地客户端终结点
     /// </summary>
     protected ipendpoint localipendpoint { get; private set; }
     /// <summary>
     /// 通信所使用的编码
     /// </summary>
     public encoding encoding { get; set; }

     #endregion

     #region connect

     /// <summary>
     /// 连接到服务器
     /// </summary>
     /// <returns>异步tcp客户端</returns>
     public asynctcpclient connect()
     {
       if (!connected)
       {
         // start the async connect operation
         tcpclient.beginconnect(
           addresses, port, handletcpserverconnected, tcpclient);
       }

       return this;
     }

     /// <summary>
     /// 关闭与服务器的连接
     /// </summary>
     /// <returns>异步tcp客户端</returns>
     public asynctcpclient close()
     {
       if (connected)
       {
         retries = 0;
         tcpclient.close();
         raiseserverdisconnected(addresses, port);
       }

       return this;
     }

     #endregion

     #region receive

     private void handletcpserverconnected(iasyncresult ar)
     {
       try
       {
         tcpclient.endconnect(ar);
         raiseserverconnected(addresses, port);
         retries = 0;
       }
       catch (exception ex)
       {
         exceptionhandler.handle(ex);
         if (retries > 0)
         {
           logger.debug(string.format(cultureinfo.invariantculture,
             "connect to server with retry {0} failed.", retries));
         }

         retries++;
         if (retries > retries)
         {
           // we have failed to connect to all the ip addresses,
           // connection has failed overall.
           raiseserverexceptionoccurred(addresses, port, ex);
           return;
         }
         else
         {
           logger.debug(string.format(cultureinfo.invariantculture,
             "waiting {0} seconds before retrying to connect to server.",
             retryinterval));
           thread.sleep(timespan.fromseconds(retryinterval));
           connect();
           return;
         }
       }

       // we are connected successfully and start asyn read operation.
       byte[] buffer = new byte[tcpclient.receivebuffersize];
       tcpclient.getstream().beginread(
         buffer, 0, buffer.length, handledatagramreceived, buffer);
     }

     private void handledatagramreceived(iasyncresult ar)
     {
       networkstream stream = tcpclient.getstream();

       int numberofreadbytes = 0;
       try
       {
         numberofreadbytes = stream.endread(ar);
       }
       catch
       {
         numberofreadbytes = 0;
       }

       if (numberofreadbytes == 0)
       {
         // connection has been closed
         close();
         return;
       }

       // received byte and trigger event notification
       byte[] buffer = (byte[])ar.asyncstate;
       byte[] receivedbytes = new byte[numberofreadbytes];
       buffer.blockcopy(buffer, 0, receivedbytes, 0, numberofreadbytes);
       raisedatagramreceived(tcpclient, receivedbytes);
       raiseplaintextreceived(tcpclient, receivedbytes);

       // then start reading from the network again
       stream.beginread(
         buffer, 0, buffer.length, handledatagramreceived, buffer);
     }

     #endregion

     #region events

     /// <summary>
     /// 接收到数据报文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<byte[]>> datagramreceived;
     /// <summary>
     /// 接收到数据报文明文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<string>> plaintextreceived;

     private void raisedatagramreceived(tcpclient sender, byte[] datagram)
     {
       if (datagramreceived != null)
       {
         datagramreceived(this,
           new tcpdatagramreceivedeventargs<byte[]>(sender, datagram));
       }
     }

     private void raiseplaintextreceived(tcpclient sender, byte[] datagram)
     {
       if (plaintextreceived != null)
       {
         plaintextreceived(this,
           new tcpdatagramreceivedeventargs<string>(
             sender, this.encoding.getstring(datagram, 0, datagram.length)));
       }
     }

     /// <summary>
     /// 与服务器的连接已建立事件
     /// </summary>
     public event eventhandler<tcpserverconnectedeventargs> serverconnected;
     /// <summary>
     /// 与服务器的连接已断开事件
     /// </summary>
     public event eventhandler<tcpserverdisconnectedeventargs> serverdisconnected;
     /// <summary>
     /// 与服务器的连接发生异常事件
     /// </summary>
     public event eventhandler<tcpserverexceptionoccurredeventargs> serverexceptionoccurred;

     private void raiseserverconnected(ipaddress[] ipaddresses, int port)
     {
       if (serverconnected != null)
       {
         serverconnected(this,
           new tcpserverconnectedeventargs(ipaddresses, port));
       }
     }

     private void raiseserverdisconnected(ipaddress[] ipaddresses, int port)
     {
       if (serverdisconnected != null)
       {
         serverdisconnected(this,
           new tcpserverdisconnectedeventargs(ipaddresses, port));
       }
     }

     private void raiseserverexceptionoccurred(
       ipaddress[] ipaddresses, int port, exception innerexception)
     {
       if (serverexceptionoccurred != null)
       {
         serverexceptionoccurred(this,
           new tcpserverexceptionoccurredeventargs(
             ipaddresses, port, innerexception));
       }
     }

     #endregion

     #region send

     /// <summary>
     /// 发送报文
     /// </summary>
     /// <param name="datagram">报文</param>
     public void send(byte[] datagram)
     {
       if (datagram == null)
         throw new argumentnullexception("datagram");

       if (!connected)
       {
         raiseserverdisconnected(addresses, port);
         throw new invalidprogramexception(
           "this client has not connected to server.");
       }

       tcpclient.getstream().beginwrite(
         datagram, 0, datagram.length, handledatagramwritten, tcpclient);
     }

     private void handledatagramwritten(iasyncresult ar)
     {
       ((tcpclient)ar.asyncstate).getstream().endwrite(ar);
     }

     /// <summary>
     /// 发送报文
     /// </summary>
     /// <param name="datagram">报文</param>
     public void send(string datagram)
     {
       send(this.encoding.getbytes(datagram));
     }

     #endregion

     #region idisposable members

     /// <summary>
     /// performs application-defined tasks associated with freeing,
     /// releasing, or resetting unmanaged resources.
     /// </summary>
     public void dispose()
     {
       dispose(true);
       gc.suppressfinalize(this);
     }

     /// <summary>
     /// releases unmanaged and - optionally - managed resources
     /// </summary>
     /// <param name="disposing"><c>true</c> to release both managed
     /// and unmanaged resources; <c>false</c>
     /// to release only unmanaged resources.
     /// </param>
     protected virtual void dispose(bool disposing)
     {
       if (!this.disposed)
       {
         if (disposing)
         {
           try
           {
             close();

             if (tcpclient != null)
             {
               tcpclient = null;
             }
           }
           catch (socketexception ex)
           {
             exceptionhandler.handle(ex);
           }
         }

         disposed = true;
       }
     }

     #endregion
   }

使用举例
复制代码 代码如下:

class program
   {
     static asynctcpclient client;

     static void main(string[] args)
     {
       logfactory.assign(new consolelogfactory());

       // 测试用,可以不指定由系统选择端口
       ipendpoint remoteep = new ipendpoint(ipaddress.parse("127.0.0.1"), 9999);
       ipendpoint localep = new ipendpoint(ipaddress.parse("127.0.0.1"), 9998);
       client = new asynctcpclient(remoteep, localep);
       client.encoding = encoding.utf8;
       client.serverexceptionoccurred +=
         new eventhandler<tcpserverexceptionoccurredeventargs>(client_serverexceptionoccurred);
       client.serverconnected +=
         new eventhandler<tcpserverconnectedeventargs>(client_serverconnected);
       client.serverdisconnected +=
         new eventhandler<tcpserverdisconnectedeventargs>(client_serverdisconnected);
       client.plaintextreceived +=
         new eventhandler<tcpdatagramreceivedeventargs<string>>(client_plaintextreceived);
       client.connect();

       console.writeline("tcp client has connected to server.");
       console.writeline("type something to send to server...");
       while (true)
       {
         try
         {
           string text = console.readline();
           client.send(text);
         }
         catch (exception ex)
         {
           console.writeline(ex.message);
         }
       }
     }

     static void client_serverexceptionoccurred(
       object sender, tcpserverexceptionoccurredeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp server {0} exception occurred, {1}.",
         e.tostring(), e.exception.message));
     }

     static void client_serverconnected(
       object sender, tcpserverconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp server {0} has connected.", e.tostring()));
     }

     static void client_serverdisconnected(
       object sender, tcpserverdisconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp server {0} has disconnected.", e.tostring()));
     }

     static void client_plaintextreceived(
       object sender, tcpdatagramreceivedeventargs<string> e)
     {
       console.write(string.format("server : {0} --> ",
         e.tcpclient.client.remoteendpoint.tostring()));
       console.writeline(string.format("{0}", e.datagram));
     }
   }

补充代码
复制代码 代码如下:

/// <summary>
   /// internal class to join the tcp client and buffer together
   /// for easy management in the server
   /// </summary>
   internal class tcpclientstate
   {
     /// <summary>
     /// constructor for a new client
     /// </summary>
     /// <param name="tcpclient">the tcp client</param>
     /// <param name="buffer">the byte array buffer</param>
     public tcpclientstate(tcpclient tcpclient, byte[] buffer)
     {
       if (tcpclient == null)
         throw new argumentnullexception("tcpclient");
       if (buffer == null)
         throw new argumentnullexception("buffer");

       this.tcpclient = tcpclient;
       this.buffer = buffer;
     }

     /// <summary>
     /// gets the tcp client
     /// </summary>
     public tcpclient tcpclient { get; private set; }

     /// <summary>
     /// gets the buffer.
     /// </summary>
     public byte[] buffer { get; private set; }

     /// <summary>
     /// gets the network stream
     /// </summary>
     public networkstream networkstream
     {
       get { return tcpclient.getstream(); }
     }
   }

复制代码 代码如下:

/// <summary>
   /// 与客户端的连接已建立事件参数
   /// </summary>
   public class tcpclientconnectedeventargs : eventargs
   {
     /// <summary>
     /// 与客户端的连接已建立事件参数
     /// </summary>
     /// <param name="tcpclient">客户端</param>
     public tcpclientconnectedeventargs(tcpclient tcpclient)
     {
       if (tcpclient == null)
         throw new argumentnullexception("tcpclient");

       this.tcpclient = tcpclient;
     }

     /// <summary>
     /// 客户端
     /// </summary>
     public tcpclient tcpclient { get; private set; }
   }

复制代码 代码如下:

/// <summary>
  /// 与客户端的连接已断开事件参数
  /// </summary>
  public class tcpclientdisconnectedeventargs : eventargs
  {
    /// <summary>
    /// 与客户端的连接已断开事件参数
    /// </summary>
    /// <param name="tcpclient">客户端</param>
    public tcpclientdisconnectedeventargs(tcpclient tcpclient)
    {
      if (tcpclient == null)
        throw new argumentnullexception("tcpclient");

      this.tcpclient = tcpclient;
    }

    /// <summary>
    /// 客户端
    /// </summary>
    public tcpclient tcpclient { get; private set; }
  }


复制代码 代码如下:

/// <summary>
   /// 与服务器的连接发生异常事件参数
   /// </summary>
   public class tcpserverexceptionoccurredeventargs : eventargs
   {
     /// <summary>
     /// 与服务器的连接发生异常事件参数
     /// </summary>
     /// <param name="ipaddresses">服务器ip地址列表</param>
     /// <param name="port">服务器端口</param>
     /// <param name="innerexception">内部异常</param>
     public tcpserverexceptionoccurredeventargs(
       ipaddress[] ipaddresses, int port, exception innerexception)
     {
       if (ipaddresses == null)
         throw new argumentnullexception("ipaddresses");

       this.addresses = ipaddresses;
       this.port = port;
       this.exception = innerexception;
     }

     /// <summary>
     /// 服务器ip地址列表
     /// </summary>
     public ipaddress[] addresses { get; private set; }
     /// <summary>
     /// 服务器端口
     /// </summary>
     public int port { get; private set; }
     /// <summary>
     /// 内部异常
     /// </summary>
     public exception exception { get; private set; }

     /// <summary>
     /// returns a <see cref="system.string"/> that represents this instance.
     /// </summary>
     /// <returns>
     /// a <see cref="system.string"/> that represents this instance.
     /// </returns>
     public override string tostring()
     {
       string s = string.empty;
       foreach (var item in addresses)
       {
         s = s + item.tostring() + ',';
       }
       s = s.trimend(',');
       s = s + ":" + port.tostring(cultureinfo.invariantculture);

       return s;
     }
   }

上一篇:

下一篇: