Cowboy 開源 WebSocket 網(wǎng)絡(luò)庫
Cowboy.WebSockets 是一個(gè)托管在?GitHub?上的基于?.NET/C# 實(shí)現(xiàn)的開源 WebSocket 網(wǎng)絡(luò)庫,其完整的實(shí)現(xiàn)了?RFC 6455 (The WebSocket Protocol)?協(xié)議標(biāo)準(zhǔn),并部分實(shí)現(xiàn)了?RFC 7692 (Compression Extensions for WebSocket)?協(xié)議標(biāo)準(zhǔn)。
WebSocket 可理解為建立在 TCP 連接通道上的更進(jìn)一步的握手,并確定了消息封裝格式。

通過定義控制幀 (Control Frame) 和數(shù)據(jù)幀 (Data Frame) 來控制通道內(nèi)的通信和數(shù)據(jù)傳輸,下圖用使用 ABNF 格式描述了幀頭部的格式。

Cowboy.WebSockets 中對(duì)于 WebSocket 的 Client/Server 分別做了實(shí)現(xiàn),分別對(duì)應(yīng)代碼中的:
Cowboy.WebSockets?的內(nèi)部實(shí)現(xiàn)是基于?Cowboy.Sockets?中的 TAP 模式的?AsyncTcpSocketServer?和?AsyncTcpSocketClient?。關(guān)于 Cowboy.Sockets 可以參考文章《C#高性能TCP服務(wù)的多種實(shí)現(xiàn)方式》。
可通過 NuGet 查找 Cowboy 來獲取 nuget 包。

WebSocket 服務(wù)端應(yīng)用
實(shí)現(xiàn) AsyncWebSocketServerModule 抽象類,其中 ModulePath 對(duì)應(yīng)著 "ws://host:port/path" 中的 path 部分??梢詫?shí)現(xiàn)多個(gè) Module,將多個(gè) Module 注入到 AsyncWebSocketServerModuleCatalog 中,或者采用反射機(jī)制等自動(dòng)發(fā)現(xiàn) Module。
public class TestWebSocketModule : AsyncWebSocketServerModule { public TestWebSocketModule() : base(@"/test") { } public override async Task OnSessionStarted(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has connected.", session.RemoteEndPoint)); await Task.CompletedTask; } public override async Task OnSessionTextReceived(AsyncWebSocketSession session, string text) { Console.Write(string.Format("WebSocket session [{0}] received Text --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendTextAsync(text); } public override async Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket session [{0}] received Binary --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); } public override async Task OnSessionClosed(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has disconnected.", session.RemoteEndPoint)); await Task.CompletedTask; } }
實(shí)例化 AsyncWebSocketServer,并將 AsyncWebSocketServerModuleCatalog?實(shí)例注入,即可啟動(dòng) WebSocket 的服務(wù)端監(jiān)聽。
class Program { static AsyncWebSocketServer _server; static void Main(string[] args) { NLogLogger.Use(); try { var catalog = new AsyncWebSocketServerModuleCatalog(); catalog.RegisterModule(new TestWebSocketModule()); var config = new AsyncWebSocketServerConfiguration(); //config.SslEnabled = true; //config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy"); //config.SslPolicyErrorsBypassed = true; _server = new AsyncWebSocketServer(22222, catalog, config); _server.Listen(); Console.WriteLine("WebSocket server has been started on [{0}].", _server.ListenedEndPoint); Console.WriteLine("Type something to send to clients..."); while (true) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _server.BroadcastText(text); //Console.WriteLine("WebSocket server [{0}] broadcasts text -> [{1}].", _server.ListenedEndPoint, text); await _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("WebSocket server [{0}] broadcasts binary -> [{1}].", _server.ListenedEndPoint, text); }); } catch (Exception ex) { Console.WriteLine(ex.Message); } } _server.Shutdown(); Console.WriteLine("WebSocket server has been stopped on [{0}].", _server.ListenedEndPoint); } catch (Exception ex) { Logger.Get().Error(ex.Message, ex); } Console.ReadKey(); } }
WebSocket 客戶端應(yīng)用
客戶端側(cè)在實(shí)例化 AsyncWebSocketClient 時(shí)有兩種方式:
- 實(shí)現(xiàn) IAsyncWebSocketClientMessageDispatcher 接口;
- 直接構(gòu)造函數(shù)注入接受各種事件的 Func<> 實(shí)現(xiàn);
public interface IAsyncWebSocketClientMessageDispatcher { Task OnServerConnected(AsyncWebSocketClient client); Task OnServerTextReceived(AsyncWebSocketClient client, string text); Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerDisconnected(AsyncWebSocketClient client); Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count); }
下面的 DEMO 采用了方式二。
class Program { static AsyncWebSocketClient _client; static void Main(string[] args) { NLogLogger.Use(); Task.Run(async () => { try { var config = new AsyncWebSocketClientConfiguration(); //config.SslTargetHost = "Cowboy"; //config.SslClientCertificates.Add(new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer")); //config.SslPolicyErrorsBypassed = true; //var uri = new Uri("ws://echo.websocket.org/"); //var uri = new Uri("wss://127.0.0.1:22222/test"); var uri = new Uri("ws://127.0.0.1:22222/test"); _client = new AsyncWebSocketClient(uri, OnServerTextReceived, OnServerBinaryReceived, OnServerConnected, OnServerDisconnected, config); await _client.Connect(); Console.WriteLine("WebSocket client has connected to server [{0}].", uri); Console.WriteLine("Type something to send to server..."); while (_client.State == WebSocketState.Open) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _client.SendText(text); //Console.WriteLine("Client [{0}] send text -> [{1}].", _client.LocalEndPoint, text); await _client.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("Client [{0}] send binary -> [{1}].", _client.LocalEndPoint, text); }).Forget(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } await _client.Close(WebSocketCloseCode.NormalClosure); Console.WriteLine("WebSocket client has disconnected from server [{0}].", uri); } catch (Exception ex) { Logger.Get().Error(ex.Message, ex); } }).Wait(); Console.ReadKey(); } private static async Task OnServerConnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has connected.", client.RemoteEndPoint)); await Task.CompletedTask; } private static async Task OnServerTextReceived(AsyncWebSocketClient client, string text) { Console.Write(string.Format("WebSocket server [{0}] received Text --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket server [{0}] received Binary --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerDisconnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has disconnected.", client.RemoteEndPoint)); await Task.CompletedTask; } }
相關(guān)資料
- RFC6455: The WebSocket Protocol
- RFC7692: Compression Extensions for WebSocket
- IANA: WebSocket Opcode Registry
- LZ77 and LZ78
- LZ77 Compression Algorithm
- DeflateStream Class
- zlib 1.2.8 Manual
- WebSocket Extensions in Tyrus
作者:匠心十年
原文鏈接:https://www.cnblogs.com/gaochundong/p/cowboy_websockets.html
本文轉(zhuǎn)自博客園網(wǎng),版權(quán)歸原作者所有。
