.NET并發(fā)編程-TPL Dataflow并行工作流
本系列學(xué)習(xí)在.NET中的并發(fā)并行編程模式,實戰(zhàn)技巧
本小節(jié)了解TPL Dataflow并行工作流,在工作中如何利用現(xiàn)成的類庫處理數(shù)據(jù)。旨在通過TDF實現(xiàn)數(shù)據(jù)流的并行處理。
TDF Block
數(shù)據(jù)流由一個一個的塊組成,一個塊處理完畢后鏈接到下一個塊上。每一個塊以消息的形式接收和緩存來自一個或多個源的數(shù)據(jù),當(dāng)接收到信息時,塊通過將其行為應(yīng)用于輸入來作出反應(yīng),塊的輸出將傳遞到下一個塊中。
TDF并不是作為.NET4.5框架的一部分分發(fā),需要單獨安裝,用過nuget導(dǎo)入Microsoft.Tpl.Dataflow。4.5之上在System.Threading.Tasks.Dataflow類庫中。TDF提供了一組豐富的組件(塊),用于基于進(jìn)程內(nèi)消息傳遞語義來組合數(shù)據(jù)流和管道基礎(chǔ)設(shè)施。
TDF最常用的塊是標(biāo)準(zhǔn)的BufferBlock、ActionBlock和TransformBlock。它們每個都基于一個委托,該委托可以是匿名函數(shù)的形式,用于定義要計算的工作。
BufferBlock<TInput>
BufferBlock

以下展示基于TDF BufferBlock的生產(chǎn)者消費者模式
BufferBlock<int> buffer = new BufferBlock<int>();
async Task Producer(IEnumerable<int> values)
{
foreach (var value in values)
await buffer.SendAsync(value);
buffer.Complete();
}
async Task Consumer(Action<int> process)
{
while (await buffer.OutputAvailableAsync())
process(await buffer.ReceiveAsync());
}
public async Task Run()
{
IEnumerable<int> range = Enumerable.Range(0, 100);
await Task.WhenAll(Producer(range), Consumer(n =>
Console.WriteLine($"value {n}")));
}
IEnumerable值的條目通過buffer.Post方法發(fā)送到BufferBlock緩沖區(qū),并使用buffer.ReceiveAsync方法異步檢索它們。OutputAvailableAsync方法用于當(dāng)下一個條目準(zhǔn)備好可被檢索時發(fā)出通知。
TransformBlock<TInput,TOutput>
用于映射轉(zhuǎn)換,該轉(zhuǎn)換函數(shù)以委托Func<TInput,TOutput>的形式作為參數(shù)傳遞。

給定一組地址下載圖片為例
var fetchImageFlag = new TransformBlock<string, (string, byte[])>(
async urlImage =>
{
using (var webClient = new WebClient())
{
byte[] data = await webClient.DownloadDataTaskAsync(urlImage);
return (urlImage, data);
}
});
List<string> urlFlags = new List<string>{
"Italy#/media/File:Flag_of_Italy.svg",
"Spain#/media/File:Flag_of_Spain.svg",
"United_States#/media/File:Flag_of_the_United_States.svg"
};
foreach (var urlFlag in urlFlags)
fetchImageFlag.Post($"https://en.wikipedia.org/wiki/{urlFlag}");
TransformBlock<string, (string, byte[]) 塊以元組字符串和字節(jié)數(shù)組格式來提取標(biāo)記圖像。轉(zhuǎn)換得到字節(jié)數(shù)組對象后,此處還沒有消費使用。下面通過另一個塊組合將其保存到本地。
ActionBlock<TInput>
通過名稱就可以看出,該塊用于接收數(shù)據(jù)時調(diào)用一個委托去處理。因為它沒有輸出,所以通常用于工作流的結(jié)束節(jié)點上。

前面通過轉(zhuǎn)換塊將圖片地址下載轉(zhuǎn)換成了字節(jié)數(shù)組,下面通過ActionBlock將其持久化本地。
var saveData = new ActionBlock<(string, byte[])>(async data =>
{
(string urlImage, byte[] image) = data;
string filePath = urlImage.Substring(urlImage.IndexOf("File:") + 5);
await Agents.File.WriteAllBytesAsync(filePath, image);
});
fetchImageFlag.LinkTo(saveData);
ActionBlock塊實例化傳遞給構(gòu)造函數(shù)的參數(shù)可以是委托Action
最后粘貼一下File的擴(kuò)展方法,用于異步讀寫文件。
public static class File
{
public static async Task<string[]> ReadAllLinesAsync(string path)
{
using (var sourceStream = new FileStream(path,
FileMode.Open, FileAccess.Read, FileShare.None,
bufferSize: 4096, useAsync: true))
using (var reader = new StreamReader(sourceStream))
{
var fileText = await reader.ReadToEndAsync();
return fileText.Split(new[] { Environment.NewLine }, StringSplitOptions.None);
}
}
public static async Task WriteAllTextAsync(string path, string contents)
{
byte[] encodedText = Encoding.Unicode.GetBytes(contents);
await WriteAllBytesAsync(path, encodedText);
}
public static async Task WriteAllBytesAsync(string path, byte[] bytes)
{
using (var sourceStream = new FileStream(path,
FileMode.Append, FileAccess.Write, FileShare.None,
bufferSize: 4096, useAsync: true))
{
await sourceStream.WriteAsync(bytes, 0, bytes.Length);
};
}
}
ending
第一次做人,何不痛痛快快,瀟瀟灑灑,討好自己
工作認(rèn)認(rèn)真真的完成,生活充充實實的過著

