如何運(yùn)用并行編程Parallel提升任務(wù)執(zhí)行效率

本文來(lái)自小易,【DoTNET技術(shù)圈】公眾號(hào)已獲得轉(zhuǎn)載授權(quán)。
《.NET并發(fā)變成實(shí)戰(zhàn)》讀后感:并行編程Parallel
手打目錄:
一、前言
二、任務(wù)并行庫(kù)(TPL)的介紹
三、Parallel.Invoke的使用
四、Parallel.For的使用
五、Parallel.ForEach+Partitioner的使用
六、指定最大并行度MaxDegreeOfParallelism
七、退出循環(huán)以及捕捉異常
八、參考的資料
一、前言
背景:在物聯(lián)網(wǎng)場(chǎng)景下,由于數(shù)據(jù)吞吐量較大,常規(guī)的Task異步執(zhí)行存在明顯的性能瓶頸,后通過(guò)參考Riccardo Terrel(里卡爾多Dian·特雷爾)著,葉偉民老師翻譯的《.NET并發(fā)編程實(shí)戰(zhàn)》,使用了Parallel并行編程,以及分區(qū)器Partitioner,將兩者結(jié)合使用提高了設(shè)備數(shù)據(jù)綁定及數(shù)據(jù)更新速度,也做到了對(duì)CPU的性能比較極致使用。
萌新記錄,大佬多加斧正!
可跳過(guò)概念,直接抵達(dá)使用實(shí)例—>五、Parallel.ForEach+Partitioner的使用
l 并行編程的原理
在《.net并發(fā)編程實(shí)戰(zhàn)》(以下稱《實(shí)戰(zhàn)》)中這樣解釋并行編程——同時(shí)執(zhí)行多個(gè)任務(wù)。
從開發(fā)人員的角度看,當(dāng)我們考慮這些問(wèn)題是,“我的程序可以同時(shí)執(zhí)行多項(xiàng)操作嗎?”或“我的程序如何更快地解決一個(gè)問(wèn)題”我們會(huì)想到并行。并行是指同時(shí)在不同的內(nèi)核上執(zhí)行多個(gè)任務(wù),以提高應(yīng)用程序的速度,這需要硬件支持(多核),且并行只能在多核設(shè)備中實(shí)現(xiàn),是提高程序性能和吞吐量的手段。
l 并行與并發(fā)編程簡(jiǎn)單區(qū)分
1、 并發(fā)編程一次處理多個(gè)操作,不需要硬件支持(使用一個(gè)或多個(gè)內(nèi)核)。
2、 并行編程在多個(gè)CPU或多個(gè)內(nèi)核上同時(shí)執(zhí)行多個(gè)操作。所有并行程序都是并發(fā)的,同時(shí)運(yùn)行的,但并非所有并發(fā)都是并行的。原因是并行只能在多核設(shè)備上實(shí)現(xiàn)。
3、 多任務(wù)同時(shí)執(zhí)行來(lái)自不同進(jìn)程的多個(gè)線程。多任務(wù)并不一定意味著并行執(zhí)行,只有在使用多個(gè)CPU 或多個(gè)內(nèi)核時(shí)才能實(shí)現(xiàn)并行執(zhí)行。

l 為什么需要使用并行編程
《實(shí)戰(zhàn)》對(duì)不同程序CPU使用資源使用的程度做了一個(gè)對(duì)比:

《實(shí)戰(zhàn)》中認(rèn)為,在一臺(tái)多核計(jì)算機(jī)上運(yùn)行一個(gè)沒(méi)有考慮到并發(fā)的應(yīng)用程序,就是在浪費(fèi)計(jì)算機(jī)的生產(chǎn)力,因?yàn)閼?yīng)用程序在順序處理過(guò)程中只能使用一部分可用的計(jì)算能力,在這種情況下任何CPU性能計(jì)數(shù)器會(huì)發(fā)現(xiàn)只有一個(gè)內(nèi)核運(yùn)行得很快,可能為100%,而其他內(nèi)核未充分利用或空閑,在上圖的8內(nèi)核的計(jì)算機(jī)中,運(yùn)行的非并行程序意味著資源的總體使用率可能不到15%。
l 使用并行編程的兩種方式
1、 任務(wù)并行庫(kù)(TPL),本文中只使用了這種方式
2、 并行LINQ(PLINQ)—》官方文檔直達(dá):https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/introduction-to-plinq
二、任務(wù)并行庫(kù)(TPL)的并行介紹
.Net Framework4 引入了新的Task Parallel Library(任務(wù)并行庫(kù),TPL),它支持?jǐn)?shù)據(jù)并行、任務(wù)并行和流水線。
當(dāng)并行循環(huán)運(yùn)行時(shí),TPL會(huì)將數(shù)據(jù)源按照內(nèi)置的分區(qū)算法(或者你可以自定義一個(gè)分區(qū)算法)將數(shù)據(jù)劃分為多個(gè)不相交的子集,然后,從線程池中選擇線程并行地處理這些數(shù)據(jù)子集,每個(gè)線程只負(fù)責(zé)處理一個(gè)數(shù)據(jù)子集。在后臺(tái),任務(wù)計(jì)劃程序?qū)⒏鶕?jù)系統(tǒng)資源和工作負(fù)荷來(lái)對(duì)任務(wù)進(jìn)行分區(qū)。如有可能,計(jì)劃程序會(huì)在工作負(fù)荷變得不平衡的情況下在多個(gè)線程和處理器之間重新分配工作。
在對(duì)任何代碼(包括循環(huán))進(jìn)行并行化時(shí),一個(gè)重要的目標(biāo)是利用盡可能多的處理器,但不要過(guò)度并行化到使行處理的開銷讓任何性能優(yōu)勢(shì)消耗殆盡的程度。比如:對(duì)于嵌套循環(huán),只會(huì)對(duì)外部循環(huán)進(jìn)行并行化,原因是不會(huì)在內(nèi)部循環(huán)中執(zhí)行太多工作。少量工作和不良緩存影響的組合可能會(huì)導(dǎo)致嵌套并行循環(huán)的性能降低。
由于循環(huán)體是并行運(yùn)行的,迭代范圍的分區(qū)是根據(jù)可用的邏輯內(nèi)核數(shù)、分區(qū)大小以及其他因素動(dòng)態(tài)變化的,因此無(wú)法保證迭代的執(zhí)行順序。
TPL引入了System.Threading.Tasks ,主類是Task,這個(gè)類表示一個(gè)異步的并發(fā)的操作,然而我們不一定要使用Task類的實(shí)例,可以使用Parallel靜態(tài)類。它提供了Parallel.Invoke, Parallel.For,Parallel.Forecah 三個(gè)方法,以下分別介紹3個(gè)方法的簡(jiǎn)單實(shí)例,每個(gè)方法都有多個(gè)重載,可自行查看源代碼
三、Parallel.Invoke的使用
static void Main(){try{Parallel.Invoke(BasicAction,// Param #0 - 靜態(tài)方法() =>// Param #1 - lambda表達(dá)式{Console.WriteLine("干飯人干飯, Thread={0}", Thread.CurrentThread.ManagedThreadId);},delegate ()// Param #2 - 委托{Console.WriteLine("委托方法中, Thread={0}", Thread.CurrentThread.ManagedThreadId);});}// 在本例中不期望出現(xiàn)異常,但如果任務(wù)中仍然拋出異常,// 它將被包裝在AggregateException中,并傳播到主線程。catch (AggregateException e){Console.WriteLine("捕捉異常 \n{0}", e.InnerException.ToString());}}static void BasicAction(){Console.WriteLine("打工人打工, Thread={0}", Thread.CurrentThread.ManagedThreadId);}

注解:
l 此方法可用于執(zhí)行可能并行執(zhí)行的一組操作。
l 不保證執(zhí)行操作的順序,或是否并行執(zhí)行操作。
l 此方法在每個(gè)提供的操作都已完成后才會(huì)返回,無(wú)論是由于正常終止還是異常終止而發(fā)生。
四、Parallel.For的使用
我們先用一個(gè)簡(jiǎn)單的插入,來(lái)比較并行的for循環(huán)與串行for循環(huán)的速度。

這里因?yàn)镻arallel.For在對(duì)處理器分配任務(wù)時(shí)候也有性能消耗,速度提升并不明顯。
接下來(lái)我們看一下Parallel.For的其中重載之一

var list = new List<int>() { 10, 20, 30, 40 };var options = new ParallelOptions();var total = 0;var result = Parallel.For(0, list.Count, () =>{Console.WriteLine("------------ thead --------------");return 1;},(i, loop, j) =>{Console.WriteLine("------------ body --------------");Console.WriteLine("i=" + list[i] + " j=" + j);return list[i];},(b) =>{Console.WriteLine("------------ tfoot --------------");Interlocked.Add(ref total, b);Console.WriteLine("total=" + total);});Console.WriteLine("iscompleted:" + result.IsCompleted);Console.Read();
注解:
l 因?yàn)椴⑿腥蝿?wù)當(dāng)中不保證執(zhí)行順序,且多任務(wù)可能會(huì)同時(shí)嘗試更新total變量,所以這里使用了 Interlocked.Add執(zhí)行,來(lái)保證它是作為原子操作來(lái)執(zhí)行。
五、Parallel.ForEach+Partitioner的結(jié)合使用
Partitioner分區(qū)器:
首先我們來(lái)看看分區(qū)器源代碼,看他是如何對(duì)數(shù)據(jù)源進(jìn)行分區(qū)的:

Partitioner.Create 若只指定的數(shù)據(jù)源的起始于結(jié)束的索引位置,創(chuàng)建分區(qū)則主要是根據(jù)邏輯內(nèi)核數(shù)(PlatformHelper.ProcessorCount)決定的。

大部分情況下,TPL在幕后使用的負(fù)載均衡機(jī)制都是非常高效的,比如我們不使用分區(qū)器,直接對(duì)數(shù)據(jù)源進(jìn)行負(fù)載均衡的并行執(zhí)行,案例請(qǐng)看—>六、指定最大并行度。
當(dāng)然我們也可以自定義分區(qū)大小,以下我們進(jìn)入到實(shí)際的開發(fā)環(huán)境中,當(dāng)前實(shí)驗(yàn)電腦為6核12線程處理器
注解:
dataList —>實(shí)時(shí)數(shù)據(jù)的數(shù)據(jù)源
Index —>數(shù)據(jù)源總數(shù),此處假設(shè)1W條數(shù)據(jù)
rangesize—>區(qū)塊大小,由此可以計(jì)算 10000/12+1=834(+1是為了適應(yīng)可能除不盡的情況)
Partitioner.Create(0,Index,rangesize) —>分區(qū)器將數(shù)據(jù)源0-1W條數(shù)據(jù)分成了12個(gè)數(shù)據(jù)塊,每一塊為834條,當(dāng)然最后一塊沒(méi)有834條數(shù)據(jù)


打上斷點(diǎn)可以看到range.Item2-range.Item1=834,已經(jīng)分好區(qū)塊了,然后就是并行處理業(yè)務(wù)代碼了。
這里貼上示例,粘貼可用:
int index = 10000;var rangesize = (int)(index / Environment.ProcessorCount) + 1;var rangePartitioner = Partitioner.Create(1, index, rangesize);System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{});
六、指定最大并行度MaxDegreeOfParallelism
參考博客文章:Parallel.ForEach 之 MaxDegreeOfParallelism
https://www.cnblogs.com/QinQouShui/p/12134232.html
System.Threading.Tasks.Parallel.ForEach(list, new ParallelOptions() { MaxDegreeOfParallelism = 12 }, range =>{#region 業(yè)務(wù)代碼#endregion});
此Parallel.ForEach并沒(méi)有使用分區(qū)器,而是用TPL進(jìn)行負(fù)載均衡的并行。
該重載的源代碼為:

七、退出循環(huán)以及捕捉異常
和串行運(yùn)行中的break不同,ParallelLoopState 提供了兩個(gè)方法用于停止Parallel.For 和 Parallel.ForEach的執(zhí)行。
public class ParallelLoopState{// 獲取循環(huán)的任何迭代是否已引發(fā)相應(yīng)迭代未處理的異常。public bool IsExceptional { get; }// 獲取循環(huán)的任何迭代是否已調(diào)用 ParallelLoopState.Stop()。public bool IsStopped { get; }// 獲取在Parallel循環(huán)中調(diào)用 ParallelLoopState.Break() 的最低循環(huán)迭代。public long? LowestBreakIteration { get; }// 獲取循環(huán)的當(dāng)前迭代是否應(yīng)基于此迭代或其他迭代發(fā)出的請(qǐng)求退出。public bool ShouldExitCurrentIteration { get; }//通知Parallel循環(huán)當(dāng)前迭代”之后”的其他迭代不需要運(yùn)行。public void Break();//通知Parallel循環(huán)當(dāng)前迭代“之外”的所有其他迭代不需要運(yùn)行。public void Stop();}
l Break:用于通知Parallel循環(huán)當(dāng)前迭代“之后”的其他迭代不需要運(yùn)行。例如,對(duì)于從 0 到 1000 并行迭代的 for 循環(huán),如果在第 100 次迭代調(diào)用 Break(),則低于 100 的所有迭代仍會(huì)運(yùn)行(即使還未開始處理),并在退出循環(huán)之前處理完。從 101 到 1000 中還未開啟的迭代則會(huì)被放棄。對(duì)于已經(jīng)在執(zhí)行的長(zhǎng)時(shí)間運(yùn)行迭代,Break()將為已運(yùn)行還未結(jié)束的迭代對(duì)應(yīng)ParallelLoopResult結(jié)構(gòu)的LowestBreakIteration屬性設(shè)置為調(diào)用Bread()迭代項(xiàng)的索引。
l Stop:Stop() 用于通知Parallel循環(huán)當(dāng)前迭代“之外”的所有其他迭代不需要運(yùn)行,無(wú)論它們是位于當(dāng)前迭代的上方還是下方。對(duì)于已經(jīng)在執(zhí)行的長(zhǎng)時(shí)間運(yùn)行迭代,可以檢查 IsStopped屬性,在觀測(cè)到是 true 時(shí)提前退出。Stop 通常在基于搜索的算法中使用,在找到一個(gè)結(jié)果之后就不需要執(zhí)行其他任何迭代。(比如在看視頻或漫畫時(shí)自動(dòng)匹配響應(yīng)最快的服務(wù)器)
var loopresult = System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{loopState.Stop();});
當(dāng)并行迭代中調(diào)用的委托拋出異常,這個(gè)異常沒(méi)有在委托中被捕獲到時(shí),就會(huì)變成一組異常,新的System.AggregateException負(fù)責(zé)處理這一組異常。
try{System.Threading.Tasks.Parallel.ForEach(rangePartitioner, range =>{});}Catch(AggregateException ex){foreach (var innerEx in ex.InnerExceptions){Console.WriteLine(innerEx.ToString());}}
八、參考的資料
l 《.net并發(fā)編程實(shí)戰(zhàn)》
l 官方文檔《.NET 中的并行編程》https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-programming/
l 博客園《.Net并行編程高級(jí)教程--Parallel》https://www.cnblogs.com/stoneniqiu/p/4857021.html
l 博客園《8天玩轉(zhuǎn)并發(fā)》
https://www.cnblogs.com/huangxincheng/category/368987.html
l 《異步編程:.NET4.X 數(shù)據(jù)并行》
https://www.cnblogs.com/heyuquan/archive/2013/03/13/parallel-for-foreach-invoke.html
l 博客園《Parallel.ForEach 之 MaxDegreeOfParallelism》
https://www.cnblogs.com/QinQouShui/p/12134232.html
end
【推薦】.NET Core開發(fā)實(shí)戰(zhàn)視頻課程 ★★★
.NET Core實(shí)戰(zhàn)項(xiàng)目之CMS 第一章 入門篇-開篇及總體規(guī)劃
【.NET Core微服務(wù)實(shí)戰(zhàn)-統(tǒng)一身份認(rèn)證】開篇及目錄索引
Redis基本使用及百億數(shù)據(jù)量中的使用技巧分享(附視頻地址及觀看指南)
.NET Core中的一個(gè)接口多種實(shí)現(xiàn)的依賴注入與動(dòng)態(tài)選擇看這篇就夠了
10個(gè)小技巧助您寫出高性能的ASP.NET Core代碼
用abp vNext快速開發(fā)Quartz.NET定時(shí)任務(wù)管理界面
在ASP.NET Core中創(chuàng)建基于Quartz.NET托管服務(wù)輕松實(shí)現(xiàn)作業(yè)調(diào)度
現(xiàn)身說(shuō)法:實(shí)際業(yè)務(wù)出發(fā)分析百億數(shù)據(jù)量下的多表查詢優(yōu)化
