<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          C# 優(yōu)雅的處理 TCP 數(shù)據(jù)

          共 16191字,需瀏覽 33分鐘

           ·

          2024-04-18 22:25

          前言


          Tcp是一個(gè)面向連接的流數(shù)據(jù)傳輸協(xié)議,用人話說就是傳輸是一個(gè)已經(jīng)建立好連接的管道,數(shù)據(jù)都在管道里像流水一樣流淌到對(duì)端。

          那么數(shù)據(jù)必然存在幾個(gè)問題,比如數(shù)據(jù)如何持續(xù)的讀取,數(shù)據(jù)包的邊界等。

          Nagle's算法

          Nagle 算法的核心思想是,在一個(gè) TCP 連接上,最多只能有一個(gè)未被確認(rèn)的小數(shù)據(jù)包(小于 MSS,即最大報(bào)文段大小)

          優(yōu)勢(shì)

          減少網(wǎng)絡(luò)擁塞:通過合并小數(shù)據(jù)包,減少了網(wǎng)絡(luò)中的數(shù)據(jù)包數(shù)量,降低了擁塞的可能性。

          提高網(wǎng)絡(luò)效率:在低速網(wǎng)絡(luò)中,Nagle 算法可以顯著提高傳輸效率。

          劣勢(shì)

          增加延遲:在交互式應(yīng)用中,Nagle 算法可能導(dǎo)致顯著的延遲,因?yàn)樗却?ACK 或合并數(shù)據(jù)包。

          C#中如何配置?

          var _socket = new Socket(IPAddress.Any.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
          _serverSocket.NoDelay = _options.NoDelay;

          連接超時(shí)

          在調(diào)用客戶端Socket連接服務(wù)器的時(shí)候,可以設(shè)置連接超時(shí)機(jī)制,具體可以傳入一個(gè)任務(wù)的取消令牌,并且設(shè)置超時(shí)時(shí)間。

          CancellationTokenSource connectTokenSource = new CancellationTokenSource();
          connectTokenSource.CancelAfter(3000); //3秒
          await _socket.ConnectAsync(RemoteEndPoint, connectTokenSource.Token);

          SSL加密傳輸

          TCP使用SSL加密傳輸,通過非對(duì)稱加密的方式,利用證書,保證雙方使用了安全的密鑰加密了報(bào)文。在C#中如何配置?

          服務(wù)端配置

          //創(chuàng)建證書對(duì)象
          var _certificate  = _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);

          //與客戶端進(jìn)行驗(yàn)證
          if (allowingUntrustedSSLCertificate) //是否允許不受信任的證書
          {
              SslStream = new SslStream(NetworkStream, false,
                  (obj, certificate, chain, error) => true);
          }
          else
          {
              SslStream = new SslStream(NetworkStream, false);
          }

          try
          {
              //serverCertificate:用于對(duì)服務(wù)器進(jìn)行身份驗(yàn)證的 X509Certificate
              //clientCertificateRequired:一個(gè) Boolean 值,指定客戶端是否必須為身份驗(yàn)證提供證書
              //checkCertificateRevocation:一個(gè) Boolean 值,指定在身份驗(yàn)證過程中是否檢查證書吊銷列表
              await SslStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions()
              {
                  ServerCertificate = x509Certificate,
                  ClientCertificateRequired = mutuallyAuthenticate,
                  CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck
              }, cancellationToken).ConfigureAwait(false);

              if (!SslStream.IsEncrypted || !SslStream.IsAuthenticated)
              {
                  return false;
              }

              if (mutuallyAuthenticate && !SslStream.IsMutuallyAuthenticated)
              {
                  return false;
              }
          }
          catch (Exception)
          {
              throw;
          }

          //完成驗(yàn)證后,通過SslStream傳輸數(shù)據(jù)
          int readCount = await SslStream.ReadAsync(buffer, _lifecycleTokenSource.Token)
              .ConfigureAwait(false);
          客戶端配置
          var _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);

          if (_options.IsSsl) //如果使用ssl加密傳輸
          {
              if (_options.AllowingUntrustedSSLCertificate)//是否允許不受信任的證書
              {
                  _sslStream = new SslStream(_networkStream, false,
                          (obj, certificate, chain, error) => true);
              }
              else
              {
                  _sslStream = new SslStream(_networkStream, false);
              }

              _sslStream.ReadTimeout = _options.ReadTimeout;
              _sslStream.WriteTimeout = _options.WriteTimeout;
              await _sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions()
              {
                  TargetHost = RemoteEndPoint.Address.ToString(),
                  EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12,
                  CertificateRevocationCheckMode = _options.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
                  ClientCertificates = new X509CertificateCollection() { _certificate }
              }, connectTokenSource.Token).ConfigureAwait(false);

              if (!_sslStream.IsEncrypted || !_sslStream.IsAuthenticated ||
                  (_options.MutuallyAuthenticate && !_sslStream.IsMutuallyAuthenticated))
              {
                  throw new InvalidOperationException("SSL authenticated faild!");
              }
          }

          KeepAlive

          keepAlive不是TCP協(xié)議中的,而是各個(gè)操作系統(tǒng)本身實(shí)現(xiàn)的功能,主要是防止一些Socket突然斷開后沒有被感知到,導(dǎo)致一直浪費(fèi)資源的情況。

          其基本原理是在此機(jī)制開啟時(shí),當(dāng)長(zhǎng)連接無數(shù)據(jù)交互一定時(shí)間間隔時(shí),連接的一方會(huì)向?qū)Ψ桨l(fā)送保活探測(cè)包,如連接仍正常,對(duì)方將對(duì)此確認(rèn)回應(yīng)

          C#中如何調(diào)用操作系統(tǒng)的KeepAlive?

          /// <summary>
          /// 開啟Socket的KeepAlive
          /// 設(shè)置tcp協(xié)議的一些KeepAlive參數(shù)
          /// </summary>
          /// <param name="socket"></param>
          /// <param name="tcpKeepAliveInterval">沒有接收到對(duì)方確認(rèn),繼續(xù)發(fā)送KeepAlive的發(fā)送頻率</param>
          /// <param name="tcpKeepAliveTime">KeepAlive的空閑時(shí)長(zhǎng),或者說每次正常發(fā)送心跳的周期</param>
          /// <param name="tcpKeepAliveRetryCount">KeepAlive之后設(shè)置最大允許發(fā)送保活探測(cè)包的次數(shù),到達(dá)此次數(shù)后直接放棄嘗試,并關(guān)閉連接</param>
          internal static void SetKeepAlive(this Socket socket, int tcpKeepAliveInterval, int tcpKeepAliveTime, int tcpKeepAliveRetryCount)
          {
              socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
              socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, tcpKeepAliveInterval);
              socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, tcpKeepAliveTime);
              socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, tcpKeepAliveRetryCount);
          }

          具體的開啟,還需要看操作系統(tǒng)的版本以及不同操作系統(tǒng)的支持。

          粘包斷包處理

          Pipe & ReadOnlySequence

          上圖來自微軟官方博客:https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/

          TCP面向應(yīng)用是流式數(shù)據(jù)傳輸,所以接收端接到的數(shù)據(jù)是像流水一樣從管道中傳來,每次取到的數(shù)據(jù)取決于應(yīng)用設(shè)置的緩沖區(qū)大小,以及套接字本身緩沖區(qū)待讀取字節(jié)數(shù)

          C#中提供的Pipe就如上圖一樣,是一個(gè)管道Pipe有兩個(gè)對(duì)象成員,一個(gè)是PipeWriter,一個(gè)是PipeReader,可以理解為一個(gè)是生產(chǎn)者,專門往管道里灌輸數(shù)據(jù)流,即字節(jié)流,一個(gè)是消費(fèi)者,專門從管道里獲取字節(jié)流進(jìn)行處理。

          可以看到Pipe中的數(shù)據(jù)包是用鏈表關(guān)聯(lián)的,但是這個(gè)數(shù)據(jù)包是從Socke緩沖區(qū)每次取到的數(shù)據(jù)包,它不一定是一個(gè)完整的數(shù)據(jù)包,所以這些數(shù)據(jù)包連接起來后形成了一個(gè)C#提供的另外一個(gè)抽象的對(duì)象ReadOnlySequence。

          但是這里還是沒有提供太好的處理斷包和粘包的辦法,因?yàn)閿喟嘲奶幚硇枰獌煞矫?/span>

          1、業(yè)務(wù)數(shù)據(jù)包的定義

          2、數(shù)據(jù)流切割出一個(gè)個(gè)完整的數(shù)據(jù)包

          假設(shè)業(yè)務(wù)已經(jīng)定義好了數(shù)據(jù)包,那么我們?nèi)绾螐腜ipe中這些數(shù)據(jù)包根據(jù)業(yè)務(wù)定義來從不同的數(shù)據(jù)包中切割出一個(gè)完整的包,那么就需要ReadOnlySequence,它提供的操作方法,非常方便我們?nèi)デ懈顢?shù)據(jù),主要是頭尾數(shù)據(jù)包的切割。

          假設(shè)我們業(yè)務(wù)層定義了一個(gè)數(shù)據(jù)包結(jié)構(gòu),數(shù)據(jù)包是不定長(zhǎng)的,包體長(zhǎng)度每次都寫在包頭里,我們來實(shí)現(xiàn)一個(gè)數(shù)據(jù)包過濾器。

          //收到消息
           while (!_receiveDataTokenSource.Token.IsCancellationRequested)
           {
               try
               {
                  //從pipe中獲取緩沖區(qū)
                   Memory<byte> buffer = _pipeWriter.GetMemory(_options.BufferSize);
                   int readCount = 0;
                   readCount = await _sslStream.ReadAsync(buffer, _lifecycleTokenSource.Token).ConfigureAwait(false);

                   if (readCount > 0)
                   {

                       var data = buffer.Slice(0, readCount);
                       //告知消費(fèi)者,往Pipe的管道中寫入了多少字節(jié)數(shù)據(jù)
                       _pipeWriter.Advance(readCount);
                   }
                   else
                   {
                       if (IsDisconnect())
                       {
                           await DisConnectAsync();
                       }

                       throw new SocketException();
                   }

                   FlushResult result = await _pipeWriter.FlushAsync().ConfigureAwait(false);
                   if (result.IsCompleted)
                   {
                       break;
                   }
               }
               catch (IOException)
               {
                   //TODO log
                   break;
               }
               catch (SocketException)
               {
                   //TODO log
                   break;
               }
               catch (TaskCanceledException)
               {
                   //TODO log
                   break;
               }
           }

           _pipeWriter.Complete();
          //消費(fèi)者處理數(shù)據(jù)
           while (!_lifecycleTokenSource.Token.IsCancellationRequested)
           {
               ReadResult result = await _pipeReader.ReadAsync();
               ReadOnlySequence<byte> buffer = result.Buffer;
               ReadOnlySequence<byte> data;
               do
               {
                  //通過過濾器得到一個(gè)完整的包
                   data = _receivePackageFilter.ResolvePackage(ref buffer);

                   if (!data.IsEmpty)
                   {
                       OnReceivedData?.Invoke(thisnew ClientDataReceiveEventArgs(data.ToArray()));
                   }
               }
               while (!data.IsEmpty && buffer.Length > 0);
               _pipeReader.AdvanceTo(buffer.Start);
           }

           _pipeReader.Complete();
          /// <summary>
          /// 解析數(shù)據(jù)包
          /// 固定報(bào)文頭解析協(xié)議
          /// </summary>
          /// <param name="headerSize">數(shù)據(jù)報(bào)文頭的大小</param>
          /// <param name="bodyLengthIndex">數(shù)據(jù)包大小在報(bào)文頭中的位置</param>
          /// <param name="bodyLengthBytes">數(shù)據(jù)包大小在報(bào)文頭中的長(zhǎng)度</param>
          /// <param name="IsLittleEndian">數(shù)據(jù)報(bào)文大小端。windows中通常是小端,unix通常是大端模式</param>
          /// </summary>
          /// <param name="sequence">一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包</param>
          public override ReadOnlySequence<byteResolvePackage(ref ReadOnlySequence<byte> sequence)
          {
              var len = sequence.Length;
              if (len < _bodyLengthIndex) return default;
              var bodyLengthSequence = sequence.Slice(_bodyLengthIndex, _bodyLengthBytes);
              byte[] bodyLengthBytes = ArrayPool<byte>.Shared.Rent(_bodyLengthBytes);
              try
              {
                  int index = 0;
                  foreach (var item in bodyLengthSequence)
                  {
                      Array.Copy(item.ToArray(), 0, bodyLengthBytes, index, item.Length);
                      index += item.Length;
                  }

                  long bodyLength = 0;
                  int offset = 0;
                  if (!_isLittleEndian)
                  {
                      offset = bodyLengthBytes.Length - 1;
                      foreach (var bytes in bodyLengthBytes)
                      {
                          bodyLength += bytes << (offset * 8);
                          offset--;
                      }
                  }
                  else
                  {

                      foreach (var bytes in bodyLengthBytes)
                      {
                          bodyLength += bytes << (offset * 8);
                          offset++;
                      }
                  }

                  if (sequence.Length < _headerSize + bodyLength)
                      return default;

                  var endPosition = sequence.GetPosition(_headerSize + bodyLength);
                  var data = sequence.Slice(0, endPosition);//得到完整數(shù)據(jù)包
                  sequence = sequence.Slice(endPosition);//緩沖區(qū)中去除取到的完整包

                  return data;
              }
              finally
              {
                  ArrayPool<byte>.Shared.Return(bodyLengthBytes);
              }
          }

          以上就是實(shí)現(xiàn)了固定數(shù)據(jù)包頭實(shí)現(xiàn)粘包斷包處理的部分代碼。

          關(guān)于TCP的連接還有一些,比如客戶端連接限制,空閑連接關(guān)閉等。

          如果大家對(duì)于完整代碼感興趣,可以看我剛寫的一個(gè)TCP庫

          EasyTcp4Net

          https://github.com/BruceQiu1996/EasyTcp4Net

          轉(zhuǎn)自:BruceNeter

          鏈接:cnblogs.com/qwqwQAQ/p/18087456







          回復(fù) 【關(guān)閉】學(xué)永久關(guān)閉App開屏廣告
          回復(fù) 【刪除】學(xué)自動(dòng)檢測(cè)那個(gè)微信好友刪除、拉黑
          回復(fù) 【手冊(cè)】獲取3萬字.NET、C#工程師面試手冊(cè)
          回復(fù) 【幫助】獲取100+個(gè)常用的C#幫助類庫
          回復(fù) 【加群】加入DotNet學(xué)習(xí)交流群

          瀏覽 136
          10點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          10點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产视频情 | 日产精品久久久久久久 | 欧美成人精品无 | 亚洲无码成人在线播放 | 日韩高清一区 |