.NET Standard中使用TCPListener和TCPClient的高性能TCP客户端服务器
目录
介绍
随着即将出现的.NET 5和需要从.NET 4.8和.NET Core迁移的人,此源代码旨在提供一个示例,说明如何通过本机TCP建立高性能的跨平台Client Server消息交换。符合.NET Standard,而无需与.NET Framework或.NET Core或其他任何特定关系。
此外,它还解决了TCP会话遇到的一个常见问题:消息自旋锁问题和异步内存泄漏问题和/或CancellationToken异常问题,这些问题在TCP Client Server实现中都很常见。
TCP Server
为了简单起见,我们将使用CLI项目,项目类型本身可以是.NET 5或.NET Core或.NET Framework。客户端和服务器均以.NET Standard语法编码,因此它们可以与所有这三个无缝连接。
Main()服务器主机的典型CLI 代码块:
using System;
public static class Program
{
public static void Main()
{
Console.WriteLine("Press esc key to stop");
int i = 0;
void PeriodicallyClearScreen()
{
i++;
if (i > 15)
{
Console.Clear();
Console.WriteLine("Press esc key to stop");
i = 0;
}
}
//Write the host messages to the console
void OnHostMessage(string input)
{
PeriodicallyClearScreen();
Console.WriteLine(input);
}
var BLL = new ServerHost.Host(OnHostMessage);
BLL.RunServerThread();
while (Console.ReadKey().Key != ConsoleKey.Escape)
{
Console.Clear();
Console.WriteLine("Press esc key to stop");
}
Console.WriteLine("Attempting clean exit");
BLL.WaitForServerThreadToStop();
Console.WriteLine("Exiting console Main.");
}
}
基本的CLI管道在这里,没什么异常。
Esc键退出“客户端”窗口,每隔15条消息就会清除该窗口(仅用于调试/演示目的)。不要在生产版本中将实际的网络消息写入控制台。
此代码块的唯一关键之处在于,为了保持高性能,客户端和服务器托管在专用线程中。也就是说,与执行Main块的线程分开。该功能包含在RunServerThread()函数中。
为此,我们将创建一个Host类并将其添加到.NET Standard库项目类型中。.NET5,.NET Framework和.NET Core项目可以引用.NET Standard库,因此它确实是最灵活的选择在撰写本文时。
向其添加以下代码:
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
public class Host
{
#region Public Functions
public virtual void RunServerThread()
{
this.ServerThread.Start();
this.OnHostMessages.Invoke("Server started");
}
public virtual void WaitForServerThreadToStop()
{
this.Server.ExitSignal = true;
this.OnHostMessages.Invoke("Exit Signal sent to server thread");
this.OnHostMessages.Invoke("Joining server thread");
this.ServerThread.Join();
this.OnHostMessages.Invoke("Server thread has exited gracefully");
}
#endregion
}
该RunServerThread()函数启动将运行服务器的Thread。
该WaitForServerThreadToStop()函数向服务器线程发出信号,通知它应该在尽可能快的时间正常退出,然后我们将服务器线程加入到调用线程(Main()在本例中为线程),否则CLI窗口只会终止/中止,而我们不会不想从清理/异常处理的角度做;优美/干净的退出是可取的。
将支持变量和构造方法添加到Server Host类中:
#region Public Delegates
public delegate void HostMessagesDelegate(string message);
#endregion
#region Variables
protected readonly StandardServer.Server Server;
protected readonly Thread ServerThread;
#region Callbacks
protected readonly HostMessagesDelegate OnHostMessages;
#endregion
#endregion
#region Constructor
public Host(HostMessagesDelegate onHostMessages)
{
this.OnHostMessages = onHostMessages ??
throw new ArgumentNullException(nameof(onHostMessages));
this.Server = new StandardServer.Server(this.OnMessage, this.ConnectionHandler);
this.ServerThread = new Thread(this.Server.Run);
}
#endregion
#region Protected Functions
protected virtual void OnMessage(string message)
{
this.OnHostMessages.Invoke(message);
}
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
}
#endregion
- ServerThread:托管服务器的线程(Server类)
- OnHostMessages和 OnMessage:仅用于演示目的,将消息从TCP连接器线程推送到客户端CLI窗口。(注意:对于WinForm应用程序,如果要向最终用户显示消息,则必须跨GUI线程执行ISynchronizeInvoke操作。Console.Write没有此限制。)
- ConnectionHandler:我们将回到这一点。
- Server:我们将要编写的TCP Server类代码。
创建一个名为Server的类,并向其中添加以下代码:
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
public class Server
{
#region Public Functions
public virtual void Run()
{
if (this.IsRunning)
return; //Already running, only one running instance allowed.
this.IsRunning = true;
this.ExitSignal = false;
while (!this.ExitSignal)
this.ConnectionLooper();
this.IsRunning = false;
}
#endregion
}
由于Run()是在专用线程中运行,因此您可能需要添加一个Try Catch块来调试所有未处理的异常,并根据需要添加日志记录。为了清楚起见,我将由您自己决定。
Run()函数处理ExitSignal检查,并将其余的TCP逻辑推入ConnectionLooper()函数。此时,您可能会认为这是一个无限的自旋循环,您是正确的,但我们将使用await来解决这个问题。
将支持变量添加Constructor到Server类中:
#region Public Properties
private volatile bool _ExitSignal;
public virtual bool ExitSignal
{
get => this._ExitSignal;
set => this._ExitSignal = value;
}
#endregion
#region Public Delegates
public delegate void ConnectionHandlerDelegate(NetworkStream connectedAutoDisposedNetStream);
public delegate void MessageDelegate(string message);
#endregion
#region Variables
#region Init/State
protected readonly int AwaiterTimeoutInMS;
protected readonly string Host;
protected readonly int Port;
protected readonly int MaxConcurrentListeners;
protected readonly TcpListener Listener;
protected bool IsRunning;
protected List<Task> TcpClientTasks = new List<Task>();
#endregion
#region Callbacks
protected readonly ConnectionHandlerDelegate OnHandleConnection;
protected readonly MessageDelegate OnMessage;
#endregion
#endregion
#region Constructor
public Server(
MessageDelegate onMessage,
ConnectionHandlerDelegate connectionHandler,
string host = "0.0.0.0",
int port = 8080,
int maxConcurrentListeners = 10,
int awaiterTimeoutInMS = 500
)
{
this.OnMessage = onMessage ?? throw new ArgumentNullException(nameof(onMessage));
this.OnHandleConnection = connectionHandler ??
throw new ArgumentNullException(nameof(connectionHandler));
this.Host = host ?? throw new ArgumentNullException(nameof(host));
this.Port = port;
this.MaxConcurrentListeners = maxConcurrentListeners;
this.AwaiterTimeoutInMS = awaiterTimeoutInMS;
this.Listener = new TcpListener(IPAddress.Parse(this.Host), this.Port);
}
#endregion
这里没有什么幻想,唯一需要注意的是,_ExitSignal成员变量的类型为volatile,这有助于防止CLI Main()线程与服务器主机线程之间的陈旧获取/设置。对于演示目的,这比Lock或Mutex更为简单,并且可能占用较少的CPU/内存。
IP 0.0.0.0是IPAddress Any。您可以根据需要更改或删除默认值。
在这里,我们保持TcpClient连接任务(异步任务)的List,这可能是大小(maxConcurrentListeners)的数组代替List。如果这样做,它的运行速度可能会快几微秒。
OnMessage 仅用于演示目的,以在CLI窗口中显示消息。
OnHandleConnection 是回调,此类的使用者将在该回调中编码其业务案例专用的网络逻辑。
添加以下代码以实现ConnectionLooper()函数:
#region Protected Functions
protected virtual void ConnectionLooper()
{
while (this.TcpClientTasks.Count < this.MaxConcurrentListeners)
{
var AwaiterTask = Task.Run(async () =>
{
this.ProcessMessagesFromClient(await this.Listener.AcceptTcpClientAsync());
});
this.TcpClientTasks.Add(AwaiterTask);
}
int RemoveAtIndex = Task.WaitAny(this.TcpClientTasks.ToArray(), this.AwaiterTimeoutInMS);
if (RemoveAtIndex > 0)
this.TcpClientTasks.RemoveAt(RemoveAtIndex);
}
#endregion
在这里,我们的服务器正在异步侦听大量TCP连接(仅限于int MaxConcurrentListeners),这可以防止内部.NET ThreadPool线程耗尽。这取决于vm/server/host上的CPU内核数。您的主机越强大,您将能够支持更多的并发监听器。
Task await建立一个线程的延续,它进行到ProcessMessagesFromClient当客户端连接成功,该函数是其中发生所述网络通信的实际处理。
然后WaitAny(),我们在这些等待的任务列表中,但是对于给定的毫秒数,这部分才是关键。如果没有超时,我们将不会检测到ExitSignal变量的更改,而这是在多线程环境中正常退出的关键。WaitAny防止自旋锁定,同时还检测退出的TCPListener Tasks (.NET内部ThreadPool线程)。
它还避免了内存泄漏无限/过多的等待,线程或任务,同时仍异步处理连接。
如果任务中止,则抛出AcceptTcpClientAsync异常,您可能需要为每个任务提供一个CancellationToken,并在您要求正常退出的任务列表中的每个任务。由于我们从不调用Listener.Stop(),因此当主机Thread正常退出时,此示例代码中的.NET GC在内部进行了清理。
当WaitAny检测到某个任务已完成时,将返回RemoveAt索引,以便我们可以简单地从列表中删除该任务,在执行ExitSignal检查后的下一个遍历中重新添加一个新任务。
添加以下代码以实现该ProcessMessagesFromClient()函数:
protected virtual void ProcessMessagesFromClient(TcpClient Connection)
{
using (Connection)
{
if (!Connection.Connected)
return;
using (var netstream = Connection.GetStream())
{
this.OnHandleConnection.Invoke(netstream);
}
}
}
ThreadPool成功连接后,此函数在连续线程中运行,通常不会与ServerThreador Main()线程相同。如果您需要严格执行,可以添加ConfigureAwait(false)。
在TCPClient和NetworkStream使用Using语法自动关闭和释放,这样,这个类的消费者并不需要关注自身与清理。
OnHandleConnection.Invoke本质上从我们现在会写的Host类中调用ConnectionHandler()函数:
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
if (!connectedAutoDisposedNetStream.CanRead && !connectedAutoDisposedNetStream.CanWrite)
return; //We need to be able to read and write
var writer = new StreamWriter(connectedAutoDisposedNetStream) { AutoFlush = true };
var reader = new StreamReader(connectedAutoDisposedNetStream);
var StartTime = DateTime.Now;
int i = 0;
while (!this.Server.ExitSignal) //Tight network message-loop (optional)
{
var JSON_Helper = new Helper.JSON();
string JSON = JSON_Helper.JSONstring();
string Response;
try //Communication block
{
//Synchronously send some JSON to the connected client
writer.WriteLine(JSON);
//Synchronously wait for a response from the connected client
Response = reader.ReadLine();
}
catch (IOException ex)
{
_ = ex; //Add Debug breakpoint and logging here
return; //Swallow exception and Exit function on network error
}
//Put breakpoint here to inspect the JSON string return by the connected client
Helper.SomeDataObject Data = JSON_Helper.DeserializeFromJSON(Response);
_ = Data;
//Update stats
i++;
var ElapsedTime = DateTime.Now - StartTime;
if (ElapsedTime.TotalMilliseconds >= 1000)
{
this.OnHostMessages.Invoke("Messages per second: " + i);
i = 0;
StartTime = DateTime.Now;
}
}
}
这主要是您自己编写的样板网络消息传递代码。如果您只需要推送1条消息,然后删除while-loop,将ExitSignal检查保持为if-statement不会受到损害,则可以根据需要多次检查volatile bool。
网络流支持字符串行和二进制字节数组传输。
在这个特定的演示中,我们将一个对象序列化和反序列化为JSON XML作为string,并通过Write/Read-Line来回发送(如果需要其他字符集,则.NET会将其自动转换为ASCII字符数组bytes[]。有几个转换助手。)
这里的一个主题是,如果由于某种原因连接丢失,则对流的写入和读取操作可能会抛出一个IOException,您将需要在Try... Catch块中进行处理。如果您需要处理或冒泡异常,可以使用throw;代替return;
如前所述,当Thread或任务继续完成或中止/抛出时,该Usings块将清除网络连接。
通常,我从不使用throw ex;因为它抛弃了最顶部的异常信息,其中作为throw;维护完整的堆栈跟踪/内部异常。
TCP客户端
TCP客户端的CLI和Host类实际上与TCP Server相同,完整版本已附加并可以下载。
下面显示的唯一区别是客户端使用TCP Client而不是TCP Listener,并且该ConnectionLooper()函数更简单,因为我们不处理多个并发的入站连接。
using System;
using System.Net.Sockets;
using System.Threading;
#region Public Functions
public virtual void Run()
{
if (this.IsRunning)
return; //Already running, only one running instance allowed.
this.IsRunning = true;
this.ExitSignal = false;
while (!this.ExitSignal)
this.ConnectionLooper();
this.IsRunning = false;
}
#endregion
#region Protected Functions
protected virtual void ConnectionLooper()
{
this.OnMessage.Invoke("Attempting server connection... ");
using (var Client = new TcpClient())
{
try
{
Client.Connect(this.Host, this.Port);
}
catch(SocketException ex)
{
this.OnMessage.Invoke(ex.Message);
//Server is unavailable, wait before re-trying
Thread.Sleep(this.ConnectionAttemptDelayInMS);
return; //Swallow exception
}
if (!Client.Connected) //exit function if not connected
return;
using (var netstream = Client.GetStream())
{
//Process the connection
this.OnHandleConnection.Invoke(netstream);
}
}
}
#endregion
在这里,我们仅使用线程休眠,如果您将客户端托管在任务(ThreadPool管理器)中,则Task.Delay是首选。
如果您不需要外部设备ExitSignal来正常退出,请务必删除它,因为这样做可以简化实现,即,您的流程在准备好退出时可以自行退出。我仅将其添加到演示示例中,以说明如何为需要该功能的用户完成此操作。
如果仅进行单个连接或间歇连接,则不需要包含While循环。在此示例中,我们无限期地等待服务器上线,如果它们不同,则需要根据您的要求进行自定义。
此外,如果您需要通过同一进程建立多个并发客户端连接,那么您也可以这样做,在这种情况下,您一定要使用与Server类类似的设计,该设计利用了可等待的WaitAny()异步TaskList模式,而不仅仅是一个此示例代码中显示了简单的同步Connect()循环。
结论
希望有人发现此代码有用。
仅使用一个核心VM,我就可以实现多达74000个双向JSON交换,因此性能应该不错。在此示例情况下,.NET4.8中的服务器和Core3.1中的客户端用于主机。
如果您不序列化对象,则它应该运行得更快,可能仅受网卡带宽或CPU速度的限制。