c#異步編程-Task(一)
一、概要
大家好,本次繼續(xù)分享自己的學(xué)習(xí)經(jīng)歷。本文主要分享異步編程中Task的使用,如果能幫助大家希望多多關(guān)注文章末尾的微信公眾號(hào)和知乎三連。各位舉手之勞是對(duì)我更新技術(shù)文章最大的支持。
個(gè)人心得:Task是一個(gè)升級(jí)版本的Thread的類,它非常的靈活支持取消、阻塞等待、合并多個(gè)Task協(xié)同操作、編碼高效易懂、異常傳播、回調(diào)傳遞結(jié)果或調(diào)用方法等。
本文相關(guān)文獻(xiàn)查閱地址:https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task-1?f1url=%3FappId%3DDev16IDEF1%26l%3DZH-CN%26k%3Dk(System.Threading.Tasks.Task);k(DevLang-csharp)%26rd%3Dtrue&view=net-5.0
二、詳細(xì)內(nèi)容
1.Task
Thread線程是用來(lái)創(chuàng)建并發(fā)的一種低級(jí)別工具,它具有一些限制,尤其是:
雖然開始線程的時(shí)候可以方便的傳入數(shù)據(jù),但是當(dāng)join的時(shí)候很難從線程獲得返回值。
可能需要設(shè)置一些共享字段。
如果操作拋出異常,鋪貨和傳播該異常都很麻煩
無(wú)法告訴線程在結(jié)束時(shí)開始另外的工作,你必須進(jìn)行join操作(在進(jìn)程中阻塞當(dāng)前的線程)
很難使用較小的并發(fā)(concurrent)來(lái)組件大型的并發(fā)
Task類可以很好的解決上述問(wèn)題,它是一個(gè)高級(jí)抽象:它代表了一個(gè)并發(fā)操作(concurrent),該操作可能有Thread支持,或不由Thread支持。
Task是可組合的(可使用continuation把他們穿成鏈)。
Tasks可以使用線程池來(lái)減少啟動(dòng)延遲。
使用TaskCompletionSource,Tasks可以利用回調(diào)的方式,在等待I/O綁定操作時(shí)完全避免使用線程。
開始一個(gè)Task ,Task.Run
開始一個(gè)Task最簡(jiǎn)單的辦法就是使用Task.Run(.net4.5,4.0的時(shí)候是Task.Factory.StartNew)傳入一個(gè)Action委托即可(例子task)
Task.Run(()=>{ Console.WriteLine("do it"); });
Task默認(rèn)使用線程池,也就是后臺(tái)線程:當(dāng)主線程結(jié)束時(shí),你創(chuàng)建所有的tasks都會(huì)結(jié)束。
Task.Run返回一個(gè)Task對(duì)象,可以使用它來(lái)監(jiān)視其過(guò)程
在Task.Run之后,我們沒(méi)有調(diào)用Start,因?yàn)樵摲椒▌?chuàng)建的是“熱”任務(wù)(hot task)
可以通過(guò)task的構(gòu)造函數(shù)創(chuàng)建“冷”任務(wù)(cold task),但開發(fā)中很少這么干
通過(guò)Task的Status屬性來(lái)跟蹤task的執(zhí)行狀態(tài)。
Task.Status枚舉狀態(tài)如下這里就不詳細(xì)分析可以去官方文檔查閱具體用法:
public enum TaskStatus
{
//
// 摘要:
// The task has been initialized but has not yet been scheduled.
Created = 0,
//
// 摘要:
// The task is waiting to be activated and scheduled internally by the .NET Framework
// infrastructure.
WaitingForActivation = 1,
//
// 摘要:
// The task has been scheduled for execution but has not yet begun executing.
WaitingToRun = 2,
//
// 摘要:
// The task is running but has not yet completed.
Running = 3,
//
// 摘要:
// The task has finished executing and is implicitly waiting for attached child
// tasks to complete.
WaitingForChildrenToComplete = 4,
//
// 摘要:
// The task completed execution successfully.
RanToCompletion = 5,
//
// 摘要:
// The task acknowledged cancellation by throwing an OperationCanceledException
// with its own CancellationToken while the token was in signaled state, or the
// task's CancellationToken was already signaled before the task started executing.
// For more information, see Task Cancellation.
Canceled = 6,
//
// 摘要:
// The task completed due to an unhandled exception.
Faulted = 7
}
if (task.Status == TaskStatus.RanToCompletion)
{
//當(dāng)當(dāng)前線程狀態(tài)表示完成時(shí)則執(zhí)行后續(xù)操作
Console.WriteLine("do it");
}
Task.Wait等待
調(diào)用task的wait方法會(huì)進(jìn)行阻塞直到操作完成,相當(dāng)于thread上的join方法。
Task mytask = Task.Run(()=>
{
Thread.Sleep(3000);
Console.WriteLine("do it");
});
Console.WriteLine(mytask.IsCanceled);//false
mytask.Wait();//阻塞主線程直到mytask執(zhí)行完畢
Console.WriteLine(mytask.IsCanceled);//true
wait也可以讓你指定一個(gè)超時(shí)時(shí)間和一個(gè)取消令牌來(lái)提前結(jié)束等待。
Long-running tasks 長(zhǎng)時(shí)間運(yùn)行的任務(wù)
默認(rèn)情況,CLR在線程池中運(yùn)行Task,這非常適合短時(shí)間運(yùn)行的Compute-Bound類工作。
針對(duì)長(zhǎng)時(shí)間允許的任務(wù)或阻塞操作,你可以不用采用線程池
Task task = Task.Factory.StartNew(()=>
{
Thread.Sleep(3000);
Console.WriteLine("do it");
},TaskCreationOptions.LongRunning);如果同時(shí)運(yùn)行多個(gè)long-running tasks(尤其是其中有處于阻塞狀態(tài)的),那么性能將會(huì)受到很大影響,這是有比TaskCreationOptions.LongRunning更好的辦法:
如果任務(wù)是IO-Bound,TaskCompletionSource和異步函數(shù)可以讓你用回調(diào)(Coninuations)代替線程來(lái)實(shí)現(xiàn)并發(fā)。
如果任務(wù)是Compute-Bound,生產(chǎn)者/消費(fèi)者隊(duì)列允許你對(duì)任務(wù)的并發(fā)性限流,避免把其他的線程和進(jìn)程的CPU處理時(shí)間片占盡。
2.Task的返回值
Task有一個(gè)泛型子類叫做Task
,它允許一個(gè)返回值。 使用Func
委托或兼容的Lambda表達(dá)式來(lái)調(diào)用Task.Run就可以得到Task 。 隨后,可以通過(guò)Result屬性來(lái)獲得返回的結(jié)果。
如果這個(gè)task還沒(méi)有完成操作,訪問(wèn)Result屬性會(huì)阻塞該線程知道該task完成操作。
Tasktask = Task.Run(()=> {
Console.WriteLine("do it");
return 666;
});
int result = task.Result;
Console.WriteLine(result);Task
可以看做是一個(gè)所謂的“未來(lái)/許諾”(future、promise),在它里面包裹著一個(gè)Result,在稍后的時(shí)候就會(huì)變得可用。 在CTP版本的時(shí)候,Task
實(shí)際上叫做Future
3.Task的異常
與Thread不一樣,Task可以很方便的傳播異常 如果你的task里面拋出了一個(gè)未處理的異常,那么該異常就會(huì)重新被拋出給:
調(diào)用了wait()的地方
訪問(wèn)了Task
的Reuslt屬性的地方。
代碼如下:
Task mytask = Task.Run(()=> { throw null; });
try
{
mytask.Wait();
}
catch (AggregateException aex)
{
if (aex.InnerExceptions is NullReferenceException)
{
Console.WriteLine("null");
}
else
{
throw;
}
}
CLR將異常包裹在AggregateException里,以便在并行編程場(chǎng)景中發(fā)揮很好的作用。
如果我們不想拋出異常就想知道task有沒(méi)有發(fā)生故障,無(wú)需重新拋出異常,通過(guò)Task的IsFaulted和IsCanceled屬性也可以檢測(cè)出Task是否發(fā)生了故障:
如果兩個(gè)屬性都返回false,那么沒(méi)有錯(cuò)誤發(fā)生。
如果IsCanceled為true,那就說(shuō)明一個(gè)OperationCanceledException為該Task拋出了。
如果IsFaulted為true,那么就說(shuō)明另一個(gè)類型的異常被拋出了,而Exception屬性也將指明錯(cuò)誤。
異常與“自治”的Task
“自治的”,“設(shè)置完就不管”的task。就是指不通過(guò)調(diào)用wait方法、result屬性或continuation進(jìn)行會(huì)合的任務(wù)。
針對(duì)自治的task,需要像Thread一樣,顯式的處理異常,避免發(fā)生“悄無(wú)聲息的故障”。
自治task上未處理的異常成為未觀察到的異常。
未觀察到的異常
可以通過(guò)全局的TaskScheduler.UnobservedTaskException來(lái)訂閱未觀察到的異常。
關(guān)于什么是“未觀察到的異?!保幸恍┘?xì)微的差別:
使用超時(shí)進(jìn)行等待的Task,如果在超時(shí)后發(fā)生故障,那么它將會(huì)產(chǎn)生一個(gè)“未觀察到的異?!?。
在Task發(fā)生故障后,如果訪問(wèn)Task的Exception屬性,那么該異常就被認(rèn)為是“已觀察到的”。
4.Coninuation
一個(gè)Continuation會(huì)對(duì)Task說(shuō):“當(dāng)你結(jié)束的時(shí)候,繼續(xù)在做點(diǎn)其他的事情”
Continuation通常是通過(guò)回調(diào)的方式實(shí)現(xiàn)的
當(dāng)操作一結(jié)束,就開始執(zhí)行
代碼如下:
Task mytask = Task.Run(() =>
{
Console.WriteLine("do it");
return 666;
});
var awaiter = mytask.GetAwaiter();
awaiter.OnCompleted(()=>
{
int result = awaiter.GetResult();
Console.WriteLine(result);
});
在Task上調(diào)用GetAwaiter會(huì)返回一個(gè)awaiter對(duì)象
它的OnCompleted方法會(huì)告訴之前的task:“當(dāng)結(jié)束/發(fā)生故障的時(shí)候要執(zhí)行委托” 。
可以將Continuation附加到已經(jīng)結(jié)束的task上面,此時(shí)continuation將會(huì)被安排立即執(zhí)行。
awaiter
任何可以暴露下列兩個(gè)方法和一個(gè)屬性的對(duì)象就是awaiter:
OnCompleted
GetResult
一個(gè)叫做IsCompleted的bool屬性
沒(méi)有接口或者父類來(lái)統(tǒng)一這些成員。
其中OnCompleted是INotifyCompletion的一部分
如果發(fā)生故障
如果之前的任務(wù)發(fā)生故障,那么當(dāng)continuation代碼調(diào)用awaiter.GetResult()的時(shí)候,異常會(huì)被重新拋出。
無(wú)需調(diào)用GetResult,我們可以直接訪問(wèn)task的Result屬性。
但調(diào)用GetResult的好處是,如果task發(fā)生故障,那么異常會(huì)被直接的拋出,而不是包裹在AggregateException里面,這樣的話catch快就簡(jiǎn)潔了很多。
非泛型task
針對(duì)泛型的task,GetResult()方法有一個(gè)void返回值,它就是用來(lái)重新拋出異常。
同步上下文
如果同步上下文出現(xiàn)了,那么OnCompleted會(huì)自動(dòng)捕獲它,并將Continuation提交到這個(gè)上下文中。這一點(diǎn)在富客戶端應(yīng)用中非常有用,因?yàn)樗鼤?huì)把Continuation放回到UI線程中。
如果是編寫一個(gè)庫(kù),則不希望出現(xiàn)上述行為,因?yàn)殚_銷較大的UI線程切換應(yīng)該再程序運(yùn)行離開庫(kù)的時(shí)候只發(fā)生一次,而不是出現(xiàn)在方法調(diào)用之間。所以,我們可以使用ConfigureAwait方法來(lái)避免這種行為
Taskmytask = Task.Run(() =>
{
Console.WriteLine("do it");
return 666;
});
var awaiter = mytask.ConfigureAwait(false).GetAwaiter();
awaiter.OnCompleted(()=>
{
int result = awaiter.GetResult();
Console.WriteLine(result);
});如果沒(méi)有同步上下文出現(xiàn),或者你使用的是ConfigureAwait(false),那么Continuation會(huì)運(yùn)行在先前的task的同一個(gè)線程上,從而避免不必要的開銷。
ContinueWith
另外一種附加Continuation的方式就是調(diào)用task的Continuewith方法。
Taskmytask = Task.Run(() =>
{
Console.WriteLine("do it");
return 666;
});
mytask.ContinueWith(task=>
{
int result = task.Result;
Console.WriteLine(result);
});Continuewith本身返回一個(gè)task,它可以用它來(lái)附加更多的Continuation。
但是,必須直接處理AggregateException:
如果task發(fā)生故障,需要額外的代碼來(lái)吧Continuation封裝(marshal)到UI應(yīng)用上。
在非UI上下文中,弱項(xiàng)讓Continuation和task執(zhí)行在同一個(gè)線程上,必須制定TaskContinuationOptions.ExecuteSynchronously,否則將它彈回到線程池。
5.TaskCompletionSource
TaskCompletionSource也可以用來(lái)創(chuàng)建Task
TaskCompletionSource讓你在稍后開始和結(jié)束的任意操作中創(chuàng)建Task
它會(huì)為你提供一個(gè)可手動(dòng)執(zhí)行的“從屬”Task
只是操作合適結(jié)束或發(fā)生故障
它對(duì)IO-Bound類工作比較理想
可以獲得所有Task的好處(傳播至、異常、Continuation等)
不需要在操作時(shí)阻塞線程
初始化一個(gè)實(shí)例即可
它有一個(gè)Task屬性可返回一個(gè)Task
該Task完全由TaskCompletionSource對(duì)象控制
調(diào)用任意一個(gè)方法都會(huì)給Task發(fā)信號(hào):
完成、故障、取消
這些方法只能調(diào)用一次,如果再次調(diào)用:
SetXXX會(huì)拋出異常
TryXXX會(huì)返回false
方法源碼如下:
public class TaskCompletionSource
{
public TaskCompletionSource();
public TaskCompletionSource(object? state);
public TaskCompletionSource(TaskCreationOptions creationOptions);
public TaskCompletionSource(object? state, TaskCreationOptions creationOptions);
public Task Task { get; }
public void SetCanceled();
public void SetException(IEnumerable exceptions) ;
public void SetException(Exception exception);
public void SetResult(TResult result);
public bool TrySetCanceled();
public bool TrySetCanceled(CancellationToken cancellationToken);
public bool TrySetException(IEnumerable exceptions) ;
public bool TrySetException(Exception exception);
public bool TrySetResult(TResult result);
}
使用示例代碼:
/*
*CODE1
*/
var tcs = new TaskCompletionSource();
new Thread(() =>
{
Thread.Sleep(5000);
tcs.SetResult(42);
})
{
IsBackground = true
}.Start();
Task task = tcs.Task;
Console.WriteLine(task.Result);
/*CODE2
* 調(diào)用此方法相當(dāng)于調(diào)用Task.Factory.StartNew
* 并使用TaskCreationOptions.LongRunning選項(xiàng)來(lái)創(chuàng)建非線程池的線程
*/
Task Run(Func func)
{
var tcs = new TaskCompletionSource();
new Thread(() =>
{
try
{
tcs.SetResult(func());
}
catch (Exception ex)
{
tcs.SetException(ex);
}
})
{
IsBackground = true
}.Start();
return tcs.Task;
}
TaskCompletionSource終極奧義
TaskCompletionSource自身創(chuàng)建Task,但并不占用線程(見示例代碼)
特別需要說(shuō)明的一點(diǎn),Task中的Delay和Thread的Sleep不一樣的是,Sleep不占用CPU處理資源而Delay會(huì),因?yàn)樗皇茄舆t了幾秒執(zhí)行代碼而已。
示例代碼:
static void Main(string[] args)
{
//5秒鐘之后,Continuation開始的時(shí)候,才占用線程
Delay(5000).GetAwaiter().OnCompleted(() => Console.WriteLine(42));
Console.ReadKey();
}
static Task Delay(int milliseconds)
{
var tcs = new TaskCompletionSource<object>();
var timer = new System.Timers.Timer(milliseconds) { AutoReset = false };
timer.Elapsed += delegate { timer.Dispose(); tcs.SetResult(null); };
timer.Start();
return tcs.Task;
}
