C# 高性能 TCP 服務(wù)的多種實(shí)現(xiàn)方式
本篇文章的主旨是使用?.NET/C# 實(shí)現(xiàn) TCP 高性能服務(wù)的不同方式,包括但不限于如下內(nèi)容:
- APM 方式,即 Asynchronous Programming Model
- TAP 方式,即 Task-based Asynchronous Pattern
- SAEA 方式,即 SocketAsyncEventArgs
- RIO 方式,即 Registered I/O
在 .NET/C# 中對于 Socket 的支持均是基于?Windows I/O Completion Ports?完成端口技術(shù)的封裝,通過不同的 Non-Blocking 封裝結(jié)構(gòu)來滿足不同的編程需求。以上方式均已在?Cowboy.Sockets?中有完整實(shí)現(xiàn),并且 APM 和 TAP 方式已經(jīng)在實(shí)際項目中應(yīng)用。Cowboy.Sockets?還在不斷的進(jìn)化和完善中,如有任何問題請及時指正。
雖然有這么多種實(shí)現(xiàn)方式,但抽象的看,它們是一樣一樣的,用兩個 Loop 即可描述:Accept Loop?和?Read Loop,如下圖所示。(這里提及的 "Loop" 指的是一種循環(huán)方式,而非特指?while/for 等關(guān)鍵字。)

- 在任何 TCP Server 的實(shí)現(xiàn)中,一定存在一個 Accept Socket Loop,用于接收 Client 端的 Connect 請求以建立 TCP Connection。
- 在任何 TCP Server 的實(shí)現(xiàn)中,一定存在一個 Read Socket Loop,用于接收 Client 端 Write 過來的數(shù)據(jù)。
如果 Accept 循環(huán)阻塞,則會導(dǎo)致無法快速的建立連接,服務(wù)端 Pending Backlog 滿,進(jìn)而導(dǎo)致 Client 端收到 Connect Timeout 的異常。如果 Read 循環(huán)阻塞,則顯然會導(dǎo)致無法及時收到 Client 端發(fā)過來的數(shù)據(jù),進(jìn)而導(dǎo)致 Client 端 Send Buffer 滿,無法再發(fā)送數(shù)據(jù)。
從實(shí)現(xiàn)細(xì)節(jié)的角度看,能夠?qū)е路?wù)阻塞的位置可能在:
- Accept 到新的?Socket,構(gòu)建新的 Connection 需要分配各種資源,分配資源慢;
- Accept 到新的?Socket,沒有及時觸發(fā)下一次 Accept;
- Read 到新的?Buffer,判定?Payload?消息長度,判定過程長;
- Read 到新的?Buffer,發(fā)現(xiàn) Payload 還沒有收全,繼續(xù) Read,則 "可能" 會導(dǎo)致一次 Buffer Copy;
- Payload 接收完畢,進(jìn)行 De-Serialization 轉(zhuǎn)成可識別的 Protocol Message,反序列化慢;
- 由 Business Module 來處理相應(yīng)的 Protocol Message,處理過程慢;
1-2 涉及到 Accept 過程和 Connection 的建立過程,3-4 涉及到 ReceiveBuffer 的處理過程,5-6 涉及到應(yīng)用邏輯側(cè)的實(shí)現(xiàn)。
Java 中著名的 Netty 網(wǎng)絡(luò)庫從 4.0 版本開始對于 Buffer 部分做了全新的嘗試,采用了名叫?ByteBuf?的設(shè)計,實(shí)現(xiàn) Buffer Zero Copy 以減少高并發(fā)條件下?Buffer 拷貝帶來的性能損失和 GC 壓力。DotNetty,Orleans?,Helios?等項目正在嘗試在 C# 中進(jìn)行類似的 ByteBuf 的實(shí)現(xiàn)。
APM 方式:TcpSocketServer
TcpSocketServer?的實(shí)現(xiàn)是基于 .NET Framework 自帶的?TcpListener?和?TcpClient?的更進(jìn)一步的封裝,采用基于 APM 的 BeginXXX 和 EndXXX 接口實(shí)現(xiàn)。
TcpSocketServer?中的 Accept Loop 指的就是,
- BeginAccept -> EndAccept->?BeginAccept -> EndAccept?-> BeginAccept -> ...
每一個建立成功的 Connection 由?TcpSocketSession?來處理,所以 TcpSocketSession 中會包含 Read Loop,
- BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> ...
TcpSocketServer 通過暴露 Event 來實(shí)現(xiàn) Connection 的建立與斷開和數(shù)據(jù)接收的通知。
event EventHandlerClientConnected; event EventHandler ClientDisconnected; event EventHandler ClientDataReceived;
使用也是簡單直接,直接訂閱事件通知。
private static void StartServer() { _server = new TcpSocketServer(22222); _server.ClientConnected += server_ClientConnected; _server.ClientDisconnected += server_ClientDisconnected; _server.ClientDataReceived += server_ClientDataReceived; _server.Listen(); } static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); } static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e) { Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); } static void server_ClientDataReceived(object sender, TcpClientDataReceivedEventArgs e) { var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength); Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session)); Console.WriteLine(string.Format("{0}", text)); _server.Broadcast(Encoding.UTF8.GetBytes(text)); }
TAP 方式:AsyncTcpSocketServer
AsyncTcpSocketServer?的實(shí)現(xiàn)是基于 .NET Framework 自帶的?TcpListener?和?TcpClient?的更進(jìn)一步的封裝,采用基于 TAP 的 async/await 的 XXXAsync 接口實(shí)現(xiàn)。
然而,實(shí)際上?XXXAsync?并沒有創(chuàng)建什么神奇的效果,其內(nèi)部實(shí)現(xiàn)只是將 APM 的方法轉(zhuǎn)換成了 TAP 的調(diào)用方式。
//************* Task-based async public methods ************************* [HostProtection(ExternalThreading = true)] public TaskAcceptSocketAsync() { return Task .Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); } [HostProtection(ExternalThreading = true)] public Task AcceptTcpClientAsync() { return Task .Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
AsyncTcpSocketServer?中的 Accept Loop 指的就是,
while (IsListening) { var tcpClient = await _listener.AcceptTcpClientAsync(); }
每一個建立成功的 Connection 由?AsyncTcpSocketSession?來處理,所以?AsyncTcpSocketSession 中會包含 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
為了將?async/await 異步到底,AsyncTcpSocketServer?所暴露的接口也同樣是 Awaitable 的。
public interface IAsyncTcpSocketServerMessageDispatcher { Task OnSessionStarted(AsyncTcpSocketSession session); Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count); Task OnSessionClosed(AsyncTcpSocketSession session); }
使用時僅需將一個實(shí)現(xiàn)了該接口的對象注入到 AsyncTcpSocketServer 的構(gòu)造函數(shù)中即可。
public class SimpleMessageDispatcher : IAsyncTcpSocketServerMessageDispatcher { public async Task OnSessionStarted(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(AsyncTcpSocketSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
當(dāng)然,對于接口的實(shí)現(xiàn)也不是強(qiáng)制了,也可以在構(gòu)造函數(shù)中直接注入方法的實(shí)現(xiàn)。
public AsyncTcpSocketServer( IPEndPoint listenedEndPoint, Funcbyte[], int, int, Task> onSessionDataReceived = null, Func onSessionStarted = null, Func onSessionClosed = null, AsyncTcpSocketServerConfiguration configuration = null) {}
SAEA 方式:TcpSocketSaeaServer
SAEA 是?SocketAsyncEventArgs?的簡寫。SocketAsyncEventArgs?是?.NET Framework 3.5?開始支持的一種支持高性能 Socket 通信的實(shí)現(xiàn)。SocketAsyncEventArgs 相比于 APM 方式的主要優(yōu)點(diǎn)可以描述如下:
The main feature of these enhancements is the?avoidance of the repeated allocation and synchronization of objects?during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
也就是說,優(yōu)點(diǎn)就是無需為每次調(diào)用都生成?IAsyncResult 等對象,向原生 Socket 更靠近一些。
使用 SocketAsyncEventArgs 的推薦步驟如下:
- Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
- Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
- Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
- If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
- If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
- Reuse the context for another operation, put it back in the pool, or discard it.
重點(diǎn)在于池化(Pooling),池化的目的就是為了重用和減少運(yùn)行時分配和垃圾回收的壓力。
TcpSocketSaeaServer?即是對 SocketAsyncEventArgs 的應(yīng)用和封裝,并實(shí)現(xiàn)了 Pooling 技術(shù)。TcpSocketSaeaServer?中的重點(diǎn)是 SaeaAwaitable 類,SaeaAwaitable 中內(nèi)置了一個 SocketAsyncEventArgs,并通過 GetAwaiter 返回 SaeaAwaiter 來支持 async/await 操作。同時,通過 SaeaExtensions 擴(kuò)展方法對來擴(kuò)展?SocketAsyncEventArgs 的 Awaitable 實(shí)現(xiàn)。
public static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool 則是一個 QueuedObjectPool 的衍生實(shí)現(xiàn),用于池化 SaeaAwaitable 實(shí)例。同時,為了減少?TcpSocketSaeaSession 的構(gòu)建過程,也實(shí)現(xiàn)了 SessionPool 即?QueuedObjectPool。
TcpSocketSaeaServer?中的 Accept Loop 指的就是,
while (IsListening) { var saea = _acceptSaeaPool.Take(); var socketError = await _listener.AcceptAsync(saea); if (socketError == SocketError.Success) { var acceptedSocket = saea.Saea.AcceptSocket; } _acceptSaeaPool.Return(saea); }
每一個建立成功的 Connection 由?TcpSocketSaeaSession?來處理,所以?TcpSocketSaeaSession?中會包含 Read Loop,
var saea = _saeaPool.Take(); saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length); while (State == TcpSocketConnectionState.Connected) { saea.Saea.SetBuffer(0, _receiveBuffer.Length); var socketError = await _socket.ReceiveAsync(saea); if (socketError != SocketError.Success) break; var receiveCount = saea.Saea.BytesTransferred; if (receiveCount == 0) break; }
同樣,TcpSocketSaeaServer?對外所暴露的接口也同樣是 Awaitable 的。
public interface ITcpSocketSaeaServerMessageDispatcher { Task OnSessionStarted(TcpSocketSaeaSession session); Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count); Task OnSessionClosed(TcpSocketSaeaSession session); }
使用起來也是簡單直接:
public class SimpleMessageDispatcher : ITcpSocketSaeaServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketSaeaSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
RIO 方式:TcpSocketRioServer
從 Windows 8.1 / Windows Server 2012 R2 開始,微軟推出了?Registered I/O Networking Extensions?來支持高性能 Socket 服務(wù)的實(shí)現(xiàn),簡稱 RIO。
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
- RIOCloseCompletionQueue
- RIOCreateCompletionQueue
- RIOCreateRequestQueue
- RIODequeueCompletion
- RIODeregisterBuffer
- RIONotify
- RIOReceive
- RIOReceiveEx
- RIORegisterBuffer
- RIOResizeCompletionQueue
- RIOResizeRequestQueue
- RIOSend
- RIOSendEx
到目前為止,.NET Framework 還沒有推出對 RIO 的支持,所以若想在 C# 中實(shí)現(xiàn) RIO 則只能通過 P/Invoke 方式,RioSharp?是開源項目中的一個比較完整的實(shí)現(xiàn)。
Cowboy.Sockets?直接引用了?RioSharp?的源代碼,放置在?Cowboy.Sockets.Experimental?名空間下,以供實(shí)驗和測試使用。
同樣,通過?TcpSocketRioServer?來實(shí)現(xiàn) Accept Loop,
_listener.OnAccepted = (acceptedSocket) => { Task.Run(async () => { await Process(acceptedSocket); }) .Forget(); };
通過?TcpSocketRioSession?來處理 Read Loop,
while (State == TcpSocketConnectionState.Connected) { int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); if (receiveCount == 0) break; }
測試代碼一如既往的類似:
public class SimpleMessageDispatcher : ITcpSocketRioServerMessageDispatcher { public async Task OnSessionStarted(TcpSocketRioSession session) { //Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session)); Console.WriteLine(string.Format("TCP session has connected {0}.", session)); await Task.CompletedTask; } public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); //Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint)); Console.Write(string.Format("Client : --> ")); Console.WriteLine(string.Format("{0}", text)); await session.SendAsync(Encoding.UTF8.GetBytes(text)); } public async Task OnSessionClosed(TcpSocketRioSession session) { Console.WriteLine(string.Format("TCP session {0} has disconnected.", session)); await Task.CompletedTask; } }
作者:匠心十年
出處:https://www.cnblogs.com/gaochundong/
原文鏈接:https://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html
本文轉(zhuǎn)自博客園網(wǎng),版權(quán)歸原作者所有。
