<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          .NET并發(fā)編程-TPL Dataflow并行工作流

          共 6315字,需瀏覽 13分鐘

           ·

          2021-05-14 07:40

          本系列學(xué)習(xí)在.NET中的并發(fā)并行編程模式,實戰(zhàn)技巧

          本小節(jié)了解TPL Dataflow并行工作流,在工作中如何利用現(xiàn)成的類庫處理數(shù)據(jù)。旨在通過TDF實現(xiàn)數(shù)據(jù)流的并行處理。

          TDF Block

          數(shù)據(jù)流由一個一個的塊組成,一個塊處理完畢后鏈接到下一個塊上。每一個塊以消息的形式接收和緩存來自一個或多個源的數(shù)據(jù),當(dāng)接收到信息時,塊通過將其行為應(yīng)用于輸入來作出反應(yīng),塊的輸出將傳遞到下一個塊中。

          TDF并不是作為.NET4.5框架的一部分分發(fā),需要單獨安裝,用過nuget導(dǎo)入Microsoft.Tpl.Dataflow。4.5之上在System.Threading.Tasks.Dataflow類庫中。TDF提供了一組豐富的組件(塊),用于基于進(jìn)程內(nèi)消息傳遞語義來組合數(shù)據(jù)流和管道基礎(chǔ)設(shè)施。

          TDF最常用的塊是標(biāo)準(zhǔn)的BufferBlock、ActionBlock和TransformBlock。它們每個都基于一個委托,該委托可以是匿名函數(shù)的形式,用于定義要計算的工作。

          BufferBlock<TInput>

          BufferBlock是一個很好的工具,用于啟用和實現(xiàn)異步生產(chǎn)者/消費者模式,其中內(nèi)部的消息隊列可以由多個源寫入或從多個目標(biāo)讀取。保證先進(jìn)先出的順序。

          以下展示基于TDF BufferBlock的生產(chǎn)者消費者模式

          BufferBlock<int> buffer = new BufferBlock<int>(); 
          async Task Producer(IEnumerable<int> values)
          {
              foreach (var value in values)
                  await buffer.SendAsync(value);    
              buffer.Complete();         
          }
          async Task Consumer(Action<int> process)
          {
              while (await buffer.OutputAvailableAsync()) 
                  process(await buffer.ReceiveAsync());   
          }
          public async Task Run()
          {
              IEnumerable<int> range = Enumerable.Range(0100);
              await Task.WhenAll(Producer(range), Consumer(n =>
                  Console.WriteLine($"value {n}")));
          }

          IEnumerable值的條目通過buffer.Post方法發(fā)送到BufferBlock緩沖區(qū),并使用buffer.ReceiveAsync方法異步檢索它們。OutputAvailableAsync方法用于當(dāng)下一個條目準(zhǔn)備好可被檢索時發(fā)出通知。

          TransformBlock<TInput,TOutput>

          用于映射轉(zhuǎn)換,該轉(zhuǎn)換函數(shù)以委托Func<TInput,TOutput>的形式作為參數(shù)傳遞。

          給定一組地址下載圖片為例

          var fetchImageFlag = new TransformBlock<string, (string, byte[])>(
              async urlImage =>
              { 
                  using (var webClient = new WebClient())
                  {
                      byte[] data = await webClient.DownloadDataTaskAsync(urlImage); 
                      return (urlImage, data);
                  }  
              });
          List<string> urlFlags = new List<string>{
              "Italy#/media/File:Flag_of_Italy.svg",
              "Spain#/media/File:Flag_of_Spain.svg",
              "United_States#/media/File:Flag_of_the_United_States.svg"
              };
          foreach (var urlFlag in urlFlags)
              fetchImageFlag.Post($"https://en.wikipedia.org/wiki/{urlFlag}");

          TransformBlock<string, (string, byte[]) 塊以元組字符串和字節(jié)數(shù)組格式來提取標(biāo)記圖像。轉(zhuǎn)換得到字節(jié)數(shù)組對象后,此處還沒有消費使用。下面通過另一個塊組合將其保存到本地。

          ActionBlock<TInput>

          通過名稱就可以看出,該塊用于接收數(shù)據(jù)時調(diào)用一個委托去處理。因為它沒有輸出,所以通常用于工作流的結(jié)束節(jié)點上。

          前面通過轉(zhuǎn)換塊將圖片地址下載轉(zhuǎn)換成了字節(jié)數(shù)組,下面通過ActionBlock將其持久化本地。

          var saveData = new ActionBlock<(string, byte[])>(async data =>
          {
              (string urlImage, byte[] image) = data; 
              string filePath = urlImage.Substring(urlImage.IndexOf("File:") + 5);
              await Agents.File.WriteAllBytesAsync(filePath, image); 
          });
          fetchImageFlag.LinkTo(saveData);    

          ActionBlock塊實例化傳遞給構(gòu)造函數(shù)的參數(shù)可以是委托Action或Func<TInput,Task>。后者對每個消息輸入異步執(zhí)行內(nèi)部操作。最后ActionBlock塊saveData使用LinkTo擴(kuò)展方法連接到前面的TransformBlock塊上。通過這種方式,TransformBlock生成的輸出會在可用時被立即推送到ActionBlock中。

          最后粘貼一下File的擴(kuò)展方法,用于異步讀寫文件。

          public static class File
          {

              public static async Task<string[]> ReadAllLinesAsync(string path)
              {
                  using (var sourceStream = new FileStream(path,
                      FileMode.Open, FileAccess.Read, FileShare.None,
                      bufferSize: 4096, useAsync: true))
                  using (var reader = new StreamReader(sourceStream))
                  {
                      var fileText = await reader.ReadToEndAsync();
                      return fileText.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
                  }
              }
              public static async Task WriteAllTextAsync(string path, string contents)
              
          {
                  byte[] encodedText = Encoding.Unicode.GetBytes(contents);
                  await WriteAllBytesAsync(path, encodedText);
              }
              public static async Task WriteAllBytesAsync(string path, byte[] bytes)
              
          {
                  using (var sourceStream = new FileStream(path,
                      FileMode.Append, FileAccess.Write, FileShare.None,
                      bufferSize: 4096, useAsync: true))
                  {
                      await sourceStream.WriteAsync(bytes, 0, bytes.Length);
                  };
              }
          }

          ending

          第一次做人,何不痛痛快快,瀟瀟灑灑,討好自己

          工作認(rèn)認(rèn)真真的完成,生活充充實實的過著


          瀏覽 71
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大香蕉青草 | 中文天堂在线免费观看 | 人妻夜夜爽天天爽麻豆三区视频 | 久操操操| 国产精品激情五月综合 |