C#高性能TCP服务的多种实现方式
哎~~ 想想大部分园友应该对"高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是 《猴赛雷,C#编写TCP服务的花样姿势
哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是《猴赛雷,C#编写TCP服务的花样姿势!》。
本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容:
- APM 方式,即 Asynchronous PRogramming Model
- TAP 方式,即 Task-based Asynchronous Pattern
- SAEA 方式,即 SocketAsyncEventArgs
- RIO 方式,即 Registered I/O
在 .NET/C# 中对于 Socket 的支持均是基于 Windows I/O Completion Ports 完成端口技术的封装,通过不同的 Non-Blocking 封装结构来满足不同的编程需求。以上方式均已在 Cowboy.Sockets 中有完整实现,并且 APM 和 TAP 方式已经在实际项目中应用。Cowboy.Sockets 还在不断的进化和完善中,如有任何问题请及时指正。
虽然有这么多种实现方式,但抽象的看,它们是一样一样的,用两个 Loop 即可描述:Accept Loop 和 Read Loop,如下图所示。(这里提及的 "Loop" 指的是一种循环方式,而非特指 while/for 等关键字。)
- 在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。
- 在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。
如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。
从实现细节的角度看,能够导致服务阻塞的位置可能在:
- Accept 到新的 Socket,构建新的 Connection 需要分配各种资源,分配资源慢;
- Accept 到新的 Socket,没有及时触发下一次 Accept;
- Read 到新的 Buffer,判定 Payload 消息长度,判定过程长;
- Read 到新的 Buffer,发现 Payload 还没有收全,继续 Read,则可能会导致一次 Buffer Copy;
- Payload 接收完毕,进行 Serialization 转成可识别的 Protocol Message,序列化慢;
- 由 Business Module 来处理相应的 Protocol Message,处理过程慢;
1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer 的处理过程,5-6 涉及到应用逻辑侧的实现。
java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer 部分做了全新的尝试,采用了名叫 ByteBuf 的设计,实现 Buffer Zero Copy 以减少高并发条件下 Buffer 拷贝带来的性能损失和 GC 压力。DotNetty,Orleans ,Helios 等项目正在尝试在 C# 中进行类似的 ByteBuf 的实现。
APM 方式:TcpSocketServer
TcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 APM 的 BeginXXX 和 EndXXX 接口实现。
TcpSocketServer 中的 Accept Loop 指的就是,
- BeginAccept -> EndAccept-> BeginAccept -> EndAccept -> BeginAccept -> ...
每一个建立成功的 Connection 由 TcpSocketsession 来处理,所以 TcpSocketSession 中会包含 Read Loop,
- BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...
TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。
event EventHandlerClientConnected; event EventHandler ClientDisconnected; event EventHandler ClientDataReceived;
使用也是简单直接,直接订阅事件通知。
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }
TAP 方式:AsyncTcpSocketServer
AsyncTcpSocketServer 的实现是基于 .NET Framework 自带的 TcpListener 和 TcpClient 的更进一步的封装,采用基于 TAP 的 async/await 的 XXXAsync 接口实现。
然而,实际上 XXXAsync 并没有创建什么神奇的效果,其内部实现只是将 APM 的方法转换成了 TAP 的调用方式。
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public TaskAcceptSocketAsync() { return Task .Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task AcceptTcpClientAsync() { return Task .Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
AsyncTcpSocketServer 中的 Accept Loop 指的就是,
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
每一个建立成功的 Connection 由 AsyncTcpSocketSession 来处理,所以 AsyncTcpSocketSession 中会包含 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
为了将 async/await 异步到底,AsyncTcpSocketServer 所暴露的接口也同样是 Awaitable 的。
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }
使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。
public AsyncTcpSocketServer( ipEndPoint listenedEndPoint, Funcbyte