欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于TCP异步Socket模型的介绍

程序员文章站 2023-12-13 10:49:40
tcp异步socket模型c#的tcp异步socket模型是通过begin-end模式实现的。例如提供beginconnect、beginaccept、 beginsend...

tcp异步socket模型
c#的tcp异步socket模型是通过begin-end模式实现的。例如提供beginconnect、beginaccept、 beginsend 和 beginreceive等。

复制代码 代码如下:

iasyncresult beginaccept(asynccallback callback, object state);

asynccallback回调在函数执行完毕后执行。state对象被用于在执行函数和回调函数间传输信息。
复制代码 代码如下:

socket socket = new socket(
                  addressfamily.internetwork,
                  sockettype.stream,
                  protocoltype.tcp);
ipendpoint iep = new ipendpoint(ipaddress.any, 8888);
socket.bind(iep);
socket.listen(5);
socket.beginaccept (new asynccallback(callbackaccept), socket);

private void callbackaccept(iasyncresult iar)
{
  socket server = (socket)iar.asyncstate;
  socket client = server.endaccept(iar);
}


则在accept一个tcpclient,需要维护tcpclient列表。
复制代码 代码如下:

private list<tcpclientstate> clients;

异步tcp服务器完整实现
复制代码 代码如下:

/// <summary>
   /// 异步tcp服务器
   /// </summary>
   public class asynctcpserver : idisposable
   {
     #region fields

     private tcplistener listener;
     private list<tcpclientstate> clients;
     private bool disposed = false;

     #endregion

     #region ctors

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="listenport">监听的端口</param>
     public asynctcpserver(int listenport)
       : this(ipaddress.any, listenport)
     {
     }

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="localep">监听的终结点</param>
     public asynctcpserver(ipendpoint localep)
       : this(localep.address, localep.port)
     {
     }

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="localipaddress">监听的ip地址</param>
     /// <param name="listenport">监听的端口</param>
     public asynctcpserver(ipaddress localipaddress, int listenport)
     {
       address = localipaddress;
       port = listenport;
       this.encoding = encoding.default;

       clients = new list<tcpclientstate>();

       listener = new tcplistener(address, port);
       listener.allownattraversal(true);
     }

     #endregion

     #region properties

     /// <summary>
     /// 服务器是否正在运行
     /// </summary>
     public bool isrunning { get; private set; }
     /// <summary>
     /// 监听的ip地址
     /// </summary>
     public ipaddress address { get; private set; }
     /// <summary>
     /// 监听的端口
     /// </summary>
     public int port { get; private set; }
     /// <summary>
     /// 通信使用的编码
     /// </summary>
     public encoding encoding { get; set; }

     #endregion

     #region server

     /// <summary>
     /// 启动服务器
     /// </summary>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver start()
     {
       if (!isrunning)
       {
         isrunning = true;
         listener.start();
         listener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), listener);
       }
       return this;
     }

     /// <summary>
     /// 启动服务器
     /// </summary>
     /// <param name="backlog">
     /// 服务器所允许的挂起连接序列的最大长度
     /// </param>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver start(int backlog)
     {
       if (!isrunning)
       {
         isrunning = true;
         listener.start(backlog);
         listener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), listener);
       }
       return this;
     }

     /// <summary>
     /// 停止服务器
     /// </summary>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver stop()
     {
       if (isrunning)
       {
         isrunning = false;
         listener.stop();

         lock (this.clients)
         {
           for (int i = 0; i < this.clients.count; i++)
           {
             this.clients[i].tcpclient.client.disconnect(false);
           }
           this.clients.clear();
         }

       }
       return this;
     }

     #endregion

     #region receive

     private void handletcpclientaccepted(iasyncresult ar)
     {
       if (isrunning)
       {
         tcplistener tcplistener = (tcplistener)ar.asyncstate;

         tcpclient tcpclient = tcplistener.endaccepttcpclient(ar);
         byte[] buffer = new byte[tcpclient.receivebuffersize];

         tcpclientstate internalclient
           = new tcpclientstate(tcpclient, buffer);
         lock (this.clients)
         {
           this.clients.add(internalclient);
           raiseclientconnected(tcpclient);
         }

         networkstream networkstream = internalclient.networkstream;
         networkstream.beginread(
           internalclient.buffer,
           0,
           internalclient.buffer.length,
           handledatagramreceived,
           internalclient);

         tcplistener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), ar.asyncstate);
       }
     }

     private void handledatagramreceived(iasyncresult ar)
     {
       if (isrunning)
       {
         tcpclientstate internalclient = (tcpclientstate)ar.asyncstate;
         networkstream networkstream = internalclient.networkstream;

         int numberofreadbytes = 0;
         try
         {
           numberofreadbytes = networkstream.endread(ar);
         }
         catch
         {
           numberofreadbytes = 0;
         }

         if (numberofreadbytes == 0)
         {
           // connection has been closed
           lock (this.clients)
           {
             this.clients.remove(internalclient);
             raiseclientdisconnected(internalclient.tcpclient);
             return;
           }
         }

         // received byte and trigger event notification
         byte[] receivedbytes = new byte[numberofreadbytes];
         buffer.blockcopy(
           internalclient.buffer, 0,
           receivedbytes, 0, numberofreadbytes);
         raisedatagramreceived(internalclient.tcpclient, receivedbytes);
         raiseplaintextreceived(internalclient.tcpclient, receivedbytes);

         // continue listening for tcp datagram packets
         networkstream.beginread(
           internalclient.buffer,
           0,
           internalclient.buffer.length,
           handledatagramreceived,
           internalclient);
       }
     }

     #endregion

     #region events

     /// <summary>
     /// 接收到数据报文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<byte[]>> datagramreceived;
     /// <summary>
     /// 接收到数据报文明文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<string>> plaintextreceived;

     private void raisedatagramreceived(tcpclient sender, byte[] datagram)
     {
       if (datagramreceived != null)
       {
         datagramreceived(this, new tcpdatagramreceivedeventargs<byte[]>(sender, datagram));
       }
     }

     private void raiseplaintextreceived(tcpclient sender, byte[] datagram)
     {
       if (plaintextreceived != null)
       {
         plaintextreceived(this, new tcpdatagramreceivedeventargs<string>(
           sender, this.encoding.getstring(datagram, 0, datagram.length)));
       }
     }

     /// <summary>
     /// 与客户端的连接已建立事件
     /// </summary>
     public event eventhandler<tcpclientconnectedeventargs> clientconnected;
     /// <summary>
     /// 与客户端的连接已断开事件
     /// </summary>
     public event eventhandler<tcpclientdisconnectedeventargs> clientdisconnected;

     private void raiseclientconnected(tcpclient tcpclient)
     {
       if (clientconnected != null)
       {
         clientconnected(this, new tcpclientconnectedeventargs(tcpclient));
       }
     }

     private void raiseclientdisconnected(tcpclient tcpclient)
     {
       if (clientdisconnected != null)
       {
         clientdisconnected(this, new tcpclientdisconnectedeventargs(tcpclient));
       }
     }

     #endregion

     #region send

     /// <summary>
     /// 发送报文至指定的客户端
     /// </summary>
     /// <param name="tcpclient">客户端</param>
     /// <param name="datagram">报文</param>
     public void send(tcpclient tcpclient, byte[] datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       if (tcpclient == null)
         throw new argumentnullexception("tcpclient");

       if (datagram == null)
         throw new argumentnullexception("datagram");

       tcpclient.getstream().beginwrite(
         datagram, 0, datagram.length, handledatagramwritten, tcpclient);
     }

     private void handledatagramwritten(iasyncresult ar)
     {
       ((tcpclient)ar.asyncstate).getstream().endwrite(ar);
     }

     /// <summary>
     /// 发送报文至指定的客户端
     /// </summary>
     /// <param name="tcpclient">客户端</param>
     /// <param name="datagram">报文</param>
     public void send(tcpclient tcpclient, string datagram)
     {
       send(tcpclient, this.encoding.getbytes(datagram));
     }

     /// <summary>
     /// 发送报文至所有客户端
     /// </summary>
     /// <param name="datagram">报文</param>
     public void sendall(byte[] datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       for (int i = 0; i < this.clients.count; i++)
       {
         send(this.clients[i].tcpclient, datagram);
       }
     }

     /// <summary>
     /// 发送报文至所有客户端
     /// </summary>
     /// <param name="datagram">报文</param>
     public void sendall(string datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       sendall(this.encoding.getbytes(datagram));
     }

     #endregion

     #region idisposable members

     /// <summary>
     /// performs application-defined tasks associated with freeing,
     /// releasing, or resetting unmanaged resources.
     /// </summary>
     public void dispose()
     {
       dispose(true);
       gc.suppressfinalize(this);
     }

     /// <summary>
     /// releases unmanaged and - optionally - managed resources
     /// </summary>
     /// <param name="disposing"><c>true</c> to release
     /// both managed and unmanaged resources; <c>false</c>
     /// to release only unmanaged resources.</param>
     protected virtual void dispose(bool disposing)
     {
       if (!this.disposed)
       {
         if (disposing)
         {
           try
           {
             stop();

             if (listener != null)
             {
               listener = null;
             }
           }
           catch (socketexception ex)
           {
             exceptionhandler.handle(ex);
           }
         }

         disposed = true;
       }
     }

     #endregion
   }

使用举例
复制代码 代码如下:

class program
   {
     static asynctcpserver server;

     static void main(string[] args)
     {
       logfactory.assign(new consolelogfactory());

       server = new asynctcpserver(9999);
       server.encoding = encoding.utf8;
       server.clientconnected +=
         new eventhandler<tcpclientconnectedeventargs>(server_clientconnected);
       server.clientdisconnected +=
         new eventhandler<tcpclientdisconnectedeventargs>(server_clientdisconnected);
       server.plaintextreceived +=
         new eventhandler<tcpdatagramreceivedeventargs<string>>(server_plaintextreceived);
       server.start();

       console.writeline("tcp server has been started.");
       console.writeline("type something to send to client...");
       while (true)
       {
         string text = console.readline();
         server.sendall(text);
       }
     }

     static void server_clientconnected(object sender, tcpclientconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp client {0} has connected.",
         e.tcpclient.client.remoteendpoint.tostring()));
     }

     static void server_clientdisconnected(object sender, tcpclientdisconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp client {0} has disconnected.",
         e.tcpclient.client.remoteendpoint.tostring()));
     }

     static void server_plaintextreceived(object sender, tcpdatagramreceivedeventargs<string> e)
     {
       if (e.datagram != "received")
       {
         console.write(string.format("client : {0} --> ",
           e.tcpclient.client.remoteendpoint.tostring()));
         console.writeline(string.format("{0}", e.datagram));
         server.send(e.tcpclient, "server has received you text : " + e.datagram);
       }
     }
   }

上一篇:

下一篇: