欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

.NET Standard中使用TCPListener和TCPClient的高性能TCP客户端服务器

程序员文章站 2022-05-28 12:48:22
...

目录

介绍

TCP Server

TCP客户端

结论


介绍

随着即将出现的.NET 5和需要从.NET 4.8.NET Core迁移的人,此源代码旨在提供一个示例,说明如何通过本机TCP建立高性能的跨平台Client Server消息交换。符合.NET Standard,而无需与.NET Framework.NET Core或其他任何特定关系。

此外,它还解决了TCP会话遇到的一个常见问题:消息自旋锁问题和异步内存泄漏问题和/CancellationToken异常问题,这些问题在TCP Client Server实现中都很常见。

TCP Server

为了简单起见,我们将使用CLI项目,项目类型本身可以是.NET 5.NET Core.NET Framework。客户端和服务器均以.NET Standard语法编码,因此它们可以与所有这三个无缝连接。

Main()服务器主机的典型CLI 代码块:

using System;

public static class Program
{
    public static void Main()
    {
        Console.WriteLine("Press esc key to stop");

        int i = 0;
        void PeriodicallyClearScreen()
        {
            i++;
            if (i > 15)
            {
                Console.Clear();
                Console.WriteLine("Press esc key to stop");
                i = 0;
            }
        }

        //Write the host messages to the console
        void OnHostMessage(string input)
        {
            PeriodicallyClearScreen();
            Console.WriteLine(input);
        }

        var BLL = new ServerHost.Host(OnHostMessage);
        BLL.RunServerThread();

        while (Console.ReadKey().Key != ConsoleKey.Escape)
        {
            Console.Clear();
            Console.WriteLine("Press esc key to stop");
        }

        Console.WriteLine("Attempting clean exit");
        BLL.WaitForServerThreadToStop();

        Console.WriteLine("Exiting console Main.");
    }
}

基本的CLI管道在这里,没什么异常。

Esc键退出客户端窗口,每隔15条消息就会清除该窗口(仅用于调试/演示目的)。不要在生产版本中将实际的网络消息写入控制台。

此代码块的唯一关键之处在于,为了保持高性能,客户端和服务器托管在专用线程中。也就是说,与执行Main块的线程分开。该功能包含在RunServerThread()函数中。

为此,我们将创建一个Host类并将其添加到.NET Standard库项目类型中。.NET5.NET Framework.NET Core项目可以引用.NET Standard库,因此它确实是最灵活的选择在撰写本文时。

向其添加以下代码:

using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;

public class Host
{
    #region Public Functions
    public virtual void RunServerThread()
    {
        this.ServerThread.Start();
        this.OnHostMessages.Invoke("Server started");
    }

    public virtual void WaitForServerThreadToStop()
    {
         this.Server.ExitSignal = true;
         this.OnHostMessages.Invoke("Exit Signal sent to server thread");
         this.OnHostMessages.Invoke("Joining server thread");
         this.ServerThread.Join();
         this.OnHostMessages.Invoke("Server thread has exited gracefully");
    }
    #endregion
}

RunServerThread()函数启动将运行服务器的Thread

WaitForServerThreadToStop()函数向服务器线程发出信号,通知它应该在尽可能快的时间正常退出,然后我们将服务器线程加入到调用线程(Main()在本例中为线程),否则CLI窗口只会终止/中止,而我们不会不想从清理/异常处理的角度做;优美/干净的退出是可取的。

将支持变量和构造方法添加到Server Host类中:

#region Public Delegates
public delegate void HostMessagesDelegate(string message);
#endregion

#region Variables

protected readonly StandardServer.Server Server;
protected readonly Thread ServerThread;

#region Callbacks
protected readonly HostMessagesDelegate OnHostMessages;
#endregion

#endregion

#region Constructor
public Host(HostMessagesDelegate onHostMessages)
{
    this.OnHostMessages = onHostMessages ?? 
         throw new ArgumentNullException(nameof(onHostMessages));
    this.Server = new StandardServer.Server(this.OnMessage, this.ConnectionHandler);
    this.ServerThread = new Thread(this.Server.Run);
}
#endregion

#region Protected Functions
protected virtual void OnMessage(string message)
{
    this.OnHostMessages.Invoke(message);
}
        
protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
}
#endregion
  • ServerThread:托管服务器的线程(Server类)
  • OnHostMessages OnMessage:仅用于演示目的,将消息从TCP连接器线程推送到客户端CLI窗口。(注意:对于WinForm应用程序,如果要向最终用户显示消息,则必须跨GUI线程执行ISynchronizeInvoke操作。Console.Write没有此限制。)
  • ConnectionHandler:我们将回到这一点。
  • Server:我们将要编写的TCP Server类代码。

创建一个名为Server的类,并向其中添加以下代码:

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

public class Server
{
    #region Public Functions
    public virtual void Run()
    {
        if (this.IsRunning)
            return; //Already running, only one running instance allowed.

        this.IsRunning = true;
        this.ExitSignal = false;

        while (!this.ExitSignal)
            this.ConnectionLooper();

        this.IsRunning = false;
   }
   #endregion
}

由于Run()是在专用线程中运行,因此您可能需要添加一个Try Catch块来调试所有未处理的异常,并根据需要添加日志记录。为了清楚起见,我将由您自己决定。

Run()函数处理ExitSignal检查,并将其余的TCP逻辑推入ConnectionLooper()函数。此时,您可能会认为这是一个无限的自旋循环,您是正确的,但我们将使用await来解决这个问题。

将支持变量添加ConstructorServer类中:

#region Public Properties
private volatile bool _ExitSignal;
public virtual bool ExitSignal
{
    get => this._ExitSignal;
    set => this._ExitSignal = value;
}
#endregion

#region Public Delegates
public delegate void ConnectionHandlerDelegate(NetworkStream connectedAutoDisposedNetStream);
public delegate void MessageDelegate(string message);
#endregion

#region Variables

#region Init/State
protected readonly int AwaiterTimeoutInMS;
protected readonly string Host;
protected readonly int Port;
protected readonly int MaxConcurrentListeners;
protected readonly TcpListener Listener;
        
protected bool IsRunning;
protected List<Task> TcpClientTasks = new List<Task>();
#endregion

#region Callbacks
protected readonly ConnectionHandlerDelegate OnHandleConnection;
protected readonly MessageDelegate OnMessage;
#endregion

#endregion

#region Constructor
public Server(
              MessageDelegate onMessage, 
              ConnectionHandlerDelegate connectionHandler, 
              string host = "0.0.0.0",
              int port = 8080, 
              int maxConcurrentListeners = 10,
              int awaiterTimeoutInMS = 500
             )
{
     this.OnMessage = onMessage ?? throw new ArgumentNullException(nameof(onMessage));
     this.OnHandleConnection = connectionHandler ?? 
          throw new ArgumentNullException(nameof(connectionHandler));
     this.Host = host ?? throw new ArgumentNullException(nameof(host));
     this.Port = port;
     this.MaxConcurrentListeners = maxConcurrentListeners;
     this.AwaiterTimeoutInMS = awaiterTimeoutInMS;
     this.Listener = new TcpListener(IPAddress.Parse(this.Host), this.Port);
}
#endregion

这里没有什么幻想,唯一需要注意的是,_ExitSignal成员变量的类型为volatile,这有助于防止CLI Main()线程与服务器主机线程之间的陈旧获取/设置。对于演示目的,这比LockMutex更为简单,并且可能占用较少的CPU/内存。

IP 0.0.0.0IPAddress Any。您可以根据需要更改或删除默认值。

在这里,我们保持TcpClient连接任务(异步任务)的List,这可能是大小(maxConcurrentListeners)的数组代替List。如果这样做,它的运行速度可能会快几微秒。

OnMessage 仅用于演示目的,以在CLI窗口中显示消息。

OnHandleConnection 是回调,此类的使用者将在该回调中编码其业务案例专用的网络逻辑。

添加以下代码以实现ConnectionLooper()函数:

#region Protected Functions
protected virtual void ConnectionLooper()
{
     while (this.TcpClientTasks.Count < this.MaxConcurrentListeners)
     {
        var AwaiterTask = Task.Run(async () =>
        {
             this.ProcessMessagesFromClient(await this.Listener.AcceptTcpClientAsync());
        });
        this.TcpClientTasks.Add(AwaiterTask);
     }

     int RemoveAtIndex = Task.WaitAny(this.TcpClientTasks.ToArray(), this.AwaiterTimeoutInMS);
     
     if (RemoveAtIndex > 0) 
        this.TcpClientTasks.RemoveAt(RemoveAtIndex);
}
#endregion

在这里,我们的服务器正在异步侦听大量TCP连接(仅限于int MaxConcurrentListeners),这可以防止内部.NET ThreadPool线程耗尽。这取决于vm/server/host上的CPU内核数。您的主机越强大,您将能够支持更多的并发监听器。

Task await建立一个线程的延续,它进行到ProcessMessagesFromClient当客户端连接成功,该函数是其中发生所述网络通信的实际处理。

然后WaitAny(),我们在这些等待的任务列表中,但是对于给定的毫秒数,这部分才是关键。如果没有超时,我们将不会检测到ExitSignal变量的更改,而这是在多线程环境中正常退出的关键。WaitAny防止自旋锁定,同时还检测退出的TCPListener Tasks .NET内部ThreadPool线程)。

它还避免了内存泄漏无限/过多的等待,线程或任务,同时仍异步处理连接。

如果任务中止,则抛出AcceptTcpClientAsync异常,您可能需要为每个任务提供一个CancellationToken,并在您要求正常退出的任务列表中的每个任务。由于我们从不调用Listener.Stop(),因此当主机Thread正常退出时,此示例代码中的.NET GC在内部进行了清理。

WaitAny检测到某个任务已完成时,将返回RemoveAt索引,以便我们可以简单地从列表中删除该任务,在执行ExitSignal检查后的下一个遍历中重新添加一个新任务。

添加以下代码以实现该ProcessMessagesFromClient()函数:

protected virtual void ProcessMessagesFromClient(TcpClient Connection)
{
    using (Connection)
    {
         if (!Connection.Connected)
            return;

         using (var netstream = Connection.GetStream())
         {
            this.OnHandleConnection.Invoke(netstream);
         }
    }
}

ThreadPool成功连接后,此函数在连续线程中运行,通常不会与ServerThreador Main()线程相同。如果您需要严格执行,可以添加ConfigureAwait(false)

TCPClientNetworkStream使用Using语法自动关闭和释放,这样,这个类的消费者并不需要关注自身与清理。

OnHandleConnection.Invoke本质上从我们现在会写的Host类中调用ConnectionHandler()函数:

protected virtual void ConnectionHandler(NetworkStream connectedAutoDisposedNetStream)
{
     if (!connectedAutoDisposedNetStream.CanRead && !connectedAutoDisposedNetStream.CanWrite)
        return; //We need to be able to read and write

     var writer = new StreamWriter(connectedAutoDisposedNetStream) { AutoFlush = true };
     var reader = new StreamReader(connectedAutoDisposedNetStream);

     var StartTime = DateTime.Now;
     int i = 0;
     while (!this.Server.ExitSignal) //Tight network message-loop (optional)
     {
          var JSON_Helper = new Helper.JSON();
          string JSON = JSON_Helper.JSONstring();

          string Response;
          try //Communication block
          {
              //Synchronously send some JSON to the connected client
              writer.WriteLine(JSON);
              //Synchronously wait for a response from the connected client
              Response = reader.ReadLine();
          }
          catch (IOException ex)
          {
              _ = ex; //Add Debug breakpoint and logging here
             return; //Swallow exception and Exit function on network error
          }

          //Put breakpoint here to inspect the JSON string return by the connected client
          Helper.SomeDataObject Data = JSON_Helper.DeserializeFromJSON(Response);
          _ = Data;

          //Update stats
          i++;
          var ElapsedTime = DateTime.Now - StartTime;
          if (ElapsedTime.TotalMilliseconds >= 1000)
          {
              this.OnHostMessages.Invoke("Messages per second: " + i);
              i = 0;
              StartTime = DateTime.Now;
          }
     }
}

这主要是您自己编写的样板网络消息传递代码。如果您只需要推送1条消息,然后删除while-loop,将ExitSignal检查保持为if-statement不会受到损害,则可以根据需要多次检查volatile bool

网络流支持字符串行和二进制字节数组传输。

在这个特定的演示中,我们将一个对象序列化和反序列化为JSON XML作为string,并通过Write/Read-Line来回发送(如果需要其他字符集,则.NET会将其自动转换为ASCII字符数组bytes[]。有几个转换助手。)

这里的一个主题是,如果由于某种原因连接丢失,则对流的写入和读取操作可能会抛出一个IOException,您将需要在Try... Catch块中进行处理。如果您需要处理或冒泡异常,可以使用throw;代替return;

如前所述,当Thread或任务继续完成或中止/抛出时,该Usings块将清除网络连接。

通常,我从不使用throw ex;因为它抛弃了最顶部的异常信息,其中作为throw;维护完整的堆栈跟踪/内部异常。

TCP客户端

TCP客户端的CLIHost类实际上与TCP Server相同,完整版本已附加并可以下载。

下面显示的唯一区别是客户端使用TCP Client而不是TCP Listener,并且该ConnectionLooper()函数更简单,因为我们不处理多个并发的入站连接。

using System;
using System.Net.Sockets;
using System.Threading;

#region Public Functions
public virtual void Run()
{
    if (this.IsRunning)
       return; //Already running, only one running instance allowed.

    this.IsRunning = true;
    this.ExitSignal = false;

    while (!this.ExitSignal)
        this.ConnectionLooper();

    this.IsRunning = false;
}
#endregion

#region Protected Functions
protected virtual void ConnectionLooper()
{
     this.OnMessage.Invoke("Attempting server connection... ");
     using (var Client = new TcpClient())
     {
         try
         {
            Client.Connect(this.Host, this.Port);
         }
         catch(SocketException ex)
         {
             this.OnMessage.Invoke(ex.Message);
             //Server is unavailable, wait before re-trying
             Thread.Sleep(this.ConnectionAttemptDelayInMS); 
             return; //Swallow exception
         }

         if (!Client.Connected) //exit function if not connected
             return;

         using (var netstream = Client.GetStream())
         {
             //Process the connection
             this.OnHandleConnection.Invoke(netstream); 
         }
    }
}
#endregion

在这里,我们仅使用线程休眠,如果您将客户端托管在任务(ThreadPool管理器)中,则Task.Delay是首选。

如果您不需要外部设备ExitSignal来正常退出,请务必删除它,因为这样做可以简化实现,即,您的流程在准备好退出时可以自行退出。我仅将其添加到演示示例中,以说明如何为需要该功能的用户完成此操作。

如果仅进行单个连接或间歇连接,则不需要包含While循环。在此示例中,我们无限期地等待服务器上线,如果它们不同,则需要根据您的要求进行自定义。

此外,如果您需要通过同一进程建立多个并发客户端连接,那么您也可以这样做,在这种情况下,您一定要使用与Server类类似的设计,该设计利用了可等待的WaitAny()异步TaskList模式,而不仅仅是一个此示例代码中显示了简单的同步Connect()循环。

结论

希望有人发现此代码有用。

仅使用一个核心VM,我就可以实现多达74000个双向JSON交换,因此性能应该不错。在此示例情况下,.NET4.8中的服务器和Core3.1中的客户端用于主机。

如果您不序列化对象,则它应该运行得更快,可能仅受网卡带宽或CPU速度的限制。

.NET Standard中使用TCPListener和TCPClient的高性能TCP客户端服务器