欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  php教程

C#高性能TCP服务的多种实现方式

程序员文章站 2022-05-25 13:39:41
...

哎~~ 想想大部分园友应该对"高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是 《猴赛雷,C#编写TCP服务的花样姿势

哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》

本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:

  • APM 方式,即 Asynchronous PRogramming Model
  • TAP 方式,即 Task-based Asynchronous Pattern
  • SAEA 方式,即 SocketAsyncEventArgs
  • RIO 方式,即 Registered I/O

在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。

虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept LoopRead Loop,如下图所示。(这里提及的 "Loop" 指的是一种循环方式,而非特指 while/for 等关键字。)

C#高性能TCP服务的多种实现方式

  • 在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。
  • 在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。

如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。

从实现细节的角度看,能够导致服务阻塞的位置可能在:

  1. Accept 到新的 Socket,构建新的 Connection 需要分配各种资源,分配资源慢;
  2. Accept 到新的 Socket,没有及时触发下一次 Accept;
  3. Read 到新的 Buffer,判定 Payload 消息长度,判定过程长;
  4. Read 到新的 Buffer,发现 Payload 还没有收全,继续 Read,则可能会导致一次 Buffer Copy;
  5. Payload 接收完毕,进行 Serialization 转成可识别的 Protocol Message,序列化慢;
  6. 由 Business Module 来处理相应的 Protocol Message,处理过程慢;

1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。

java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf 的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNetty,Orleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。

APM 方式:TcpSocketServer

TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。

TcpSocketServer 中的 Accept Loop 指的就是,

  • BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...

每一个建立成功的 Connection 由 TcpSocketsession 来处理,所以 TcpSocketSession 中会包含 Read Loop,

  • BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...

TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。

  event EventHandler ClientConnected;
  event EventHandler ClientDisconnected;
  event EventHandler ClientDataReceived;

使用也是简单直接,直接订阅事件通知。

  private static void StartServer()
  {
      _server = new TcpSocketServer(22222);
      _server.ClientConnected += server_ClientConnected;
      _server.ClientDisconnected += server_ClientDisconnected;
      _server.ClientDataReceived += server_ClientDataReceived;
      _server.Listen();
  }
  
  static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session));
  }
  
  static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
  {
      Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session));
  }
  
  static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e)
  {
      var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
      Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
      Console.WriteLine(string.Format("{0}", text));
      _server.Broadcast(Encoding.UTF8.GetBytes(text));
  }

TAP 方式:AsyncTcpSocketServer

AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。

然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。

  //************* Task-based async public methods *************************
  [HostProtection(ExternalThreading = true)]
  public Task AcceptSocketAsync()
  {
      return Task.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null);
  }
  
  [HostProtection(ExternalThreading = true)]
  public Task AcceptTcpClientAsync()
  {
      return Task.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
  }

AsyncTcpSocketServer 中的 Accept Loop 指的就是,

  while (IsListening)
  {
      var tcpClient = await _listener.AcceptTcpClientAsync();
  }

每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,

  while (State == TcpSocketConnectionState.Connected)
  {
      int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
  }

为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。

  public interface IAsyncTcpSocketServerMessageDispatcher
  {
      Task OnSessionStarted(AsyncTcpSocketSession session);
      Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
      Task OnSessionClosed(AsyncTcpSocketSession session);
  }

使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。

  public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher
  {
      public async Task OnSessionStarted(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
          await Task.CompletedTask;
      }
  
      public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
      {
          var text = Encoding.UTF8.GetString(data, offset, count);
          Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
          Console.WriteLine(string.Format("{0}", text));
  
          await session.SendAsync(Encoding.UTF8.GetBytes(text));
      }
  
      public async Task OnSessionClosed(AsyncTcpSocketSession session)
      {
          Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
          await Task.CompletedTask;
      }
  }

当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。

  public AsyncTcpSocketServer(
      ipEndPoint listenedEndPoint,
      Funcbyte