C++異步從理論到實(shí)踐!

導(dǎo)語 | C++20帶來了coroutine特性, 同時(shí)新的execution也在提案過程中, 這兩者都給我們在C++中解決異步問題帶來了新的思路。但對比其他語言的實(shí)現(xiàn),C++的協(xié)程和后續(xù)的execution都存在一定的理解和封裝成本,本系列的分享我們將圍繞基本的原理,相應(yīng)的封裝,以及剖析優(yōu)秀的第三方實(shí)現(xiàn),最終結(jié)合筆者framework落地的情況來展開。
一、糾結(jié)的開篇
之前設(shè)計(jì)我們游戲用的c++框架的時(shí)候,剛好c++20的coroutine已經(jīng)發(fā)布,又因?yàn)槭菍iT給game server用的c++ framework,對多線程的訴求相對有限,或者本著少并發(fā)少奇怪的錯(cuò)誤的原則,除網(wǎng)絡(luò)和IO和日志等少量模塊外,大部分模塊主要還是工作在主線程上的,所以當(dāng)時(shí)設(shè)計(jì)的重點(diǎn)也就放在了c++20 coroutine的包裝和使用上,更多的使用coroutine來完善異步的支持。
但如果考慮到framework作為前后端公用框架的話,原來主要針對主線程使用的包裝的coroutine調(diào)度器就顯得有些不夠用,以此作為基礎(chǔ),我們開始了嘗試結(jié)合比較新的c++異步思路,來重新思考應(yīng)該如何實(shí)現(xiàn)一個(gè)盡量利用c++新特性,業(yè)務(wù)層簡單易用的異步框架了。
本系列的主要內(nèi)容也是圍繞這條主線來鋪開, 過程中我們主要以:
自有的framework異步實(shí)現(xiàn)-主要落地嘗試?yán)胏++20的coroutine實(shí)現(xiàn)一個(gè)業(yè)務(wù)級的調(diào)度器。
asio-這個(gè)應(yīng)該不用多說了,近年來一直高頻迭代,業(yè)界廣泛使用的開源第三方庫,中間的異步任務(wù)調(diào)度,網(wǎng)絡(luò)部分的代碼實(shí)現(xiàn)都非常優(yōu)質(zhì)。
libunifex-最接近當(dāng)前sender/receiver版execution提案的可實(shí)操版本,c++17/20兼容,但不推薦使用c++17的版本進(jìn)行任何嘗試,原因后續(xù)文件會(huì)展開。
這幾個(gè)庫作為基礎(chǔ),逐步展開我們對c++異步的探索,然后再回到落地實(shí)踐這條主線上,探討一個(gè)業(yè)務(wù)側(cè)使用簡單,內(nèi)部高效的異步庫應(yīng)該如何來實(shí)現(xiàn)并落地。當(dāng)然,我們的側(cè)重點(diǎn)主要還是c++異步的調(diào)度和處理上,網(wǎng)絡(luò)相關(guān)的有部分內(nèi)容可能會(huì)簡單提到,但不會(huì)進(jìn)行深入的展開。
其實(shí)整個(gè)嘗試的過程只能說非常不順利了,當(dāng)然,隨著對相關(guān)實(shí)現(xiàn)的深入理解和細(xì)節(jié)的深挖,收益也是頗多的。閑話不多說了,我們直接切入主題,以對異步的思考來展開這篇總覽的內(nèi)容。
二、前塵往事-rstudio framework實(shí)現(xiàn)
rstudio framework的異步框架由兩塊比較獨(dú)立的部分組成:
一部分是源自asio幾年前版本的post和strand部分實(shí)現(xiàn),另外附加了一些業(yè)務(wù)側(cè)較常用的像Fence等對象。
另外一部分是主線程的協(xié)程調(diào)度器實(shí)現(xiàn),這部分最早是基于c++17實(shí)現(xiàn)的一版stackless協(xié)程;另外一版則是gcc11.1正式發(fā)布后,直接用c++20重構(gòu)了整個(gè)實(shí)現(xiàn),直接使用c++20的coroutine的一個(gè)版本。
(一)asio部分
這一部分的內(nèi)容因?yàn)楹罄m(xù)有asio scheduler實(shí)現(xiàn)具體的分析篇章,這個(gè)地方主要以業(yè)務(wù)側(cè)使用進(jìn)行展開了。
executor概述
來源于1.6X boost同期的asio standalone版本。
去除了各平臺(tái)網(wǎng)絡(luò)處理相關(guān)的代碼。
僅保留了post和相關(guān)的功能(新版本有executor實(shí)現(xiàn))
早期c++11兼容,無coroutine支持。
除網(wǎng)絡(luò)庫外,asio非常有使用價(jià)值的一部分代碼。
一個(gè)簡單的使用示例
GJobSystem->Post([]() {//some calculate task here//...GJobSystem->Post([]() {//task notify code here//...},rstudio::JobSystemType::kLogicJob);}, rstudio::JobSystemType::kWorkJob);
相關(guān)的時(shí)序圖:

當(dāng)前框架使用的線程結(jié)構(gòu)

預(yù)定義的枚舉值:
enum class JobSystemType : int {kLogicJob = 0, // logic thread(main thread)kWorkJob, // work threadkSlowJob, // slow work thread(run io or other slow job)kNetworkJob, // add a separate thread for networkkNetworkConnectJob, // extra connect thread for networkkLogJob, // log threadkNotifyExternalJob, // use external process to report something, 1 thread only~~kTotalJobTypes,};
不同Job說明:
kLogicJob:主線程(邏輯線程)執(zhí)行任務(wù)。
kWorkJob:Work Thread線程池執(zhí)行任務(wù)(多個(gè)), 一般是計(jì)算量可控的小任務(wù)。
kSlowJob:IO專用線程池, IO相關(guān)的任務(wù)投遞到本線程池。
kNetworkJob:目前tbuspp專用的處理線程。
kNetworkConnectJob:專用的網(wǎng)絡(luò)連接線程, tbuspp模式下不需要。
kLogJob:日志專用線程, 目前日志模塊是自己起的線程, 可以歸并到此處管理。
kNotifyExternalJob:專用的通知線程, 如lua error的上報(bào), 使用該類型
Timer任務(wù)相關(guān)
相關(guān)接口:
//NoIgnore versionuint64_t JobSystemModule::AddAlwaysRunJob(JobSystemType jobType,threads::ThreadJobFunction&& periodJob,unsigned long periodTimeMs);uint64_t JobSystemModule::AddTimesRunJob(JobSystemType jobType,threads::ThreadJobFunction&& periodJob,unsigned long periodTimeMs,unsigned int runCount);uint64_t JobSystemModule::AddDelayRunJob(JobSystemType jobType,threads::ThreadJobFunction&& periodJob,unsigned long delayTimeMs);void JobSystemModule::KillTimerJob(uint64_t tid);
在線程池上關(guān)聯(lián)執(zhí)行任務(wù)-Strand
特定的情況下, 被派發(fā)到Work線程池的任務(wù)存在依賴關(guān)系。
需要串聯(lián)執(zhí)行的時(shí)候, 這個(gè)時(shí)候我們需要額外的設(shè)施JobStrand。
來保證任務(wù)是按先后依賴關(guān)系來串行執(zhí)行的。
如下圖中part1,part2,part3,part4串行執(zhí)行的情況所示。

示例代碼:
auto strand = GJobSystem->RequestStrand(rstudio::JobSystemType::kWorkJob);starnd.Post([](){//part1~// ...});starnd.Post([](){//part2~// ...});starnd.Post([](){//part3~// ...});starnd.Post([](){//part4~// ...});starnd.Post([](){GJobSystem->Post([](){//return code here// ...}, rstudio::JobSystemType::kLogicJob);});
其他輔助設(shè)施
JobFence
jobs::JobFencePtr JobSystemModule::RequestFence();字面義,柵欄,起到攔截執(zhí)行的作用。
一般多用于模塊的初始化和結(jié)束。
如tbuspp在kNetworkJob上的初始化和結(jié)束。

示例代碼(TcpService的初始化):
job_system_module_->Post([this, workTicket]() {if (!workTicket || workTicket->IsExpired()) return;InitInNetworkThread();},JobSystemType::kNetworkJob);period_task_ptr = job_system_module_->AddAlwaysRunJob(JobSystemType::kNetworkJob,[this, workTicket]() {if (!workTicket || workTicket->IsExpired()) return;LoopInNetworkThread();},10);fence_->FenceTo((int)JobSystemType::kNetworkJob);fence_->Wait();
JobNotify && JobWaiter
jobs::JobWaiterPtr JobSystemModule::RequestWaiter();jobs::JobNotifyPtr JobSystemModule::RequestNotify();
批量任務(wù)管理使用
等待的方式的區(qū)別
JobNotify: 執(zhí)行完成調(diào)用額外指定的回調(diào)。
JobWaiter: 以Wait的方式在特定線程等待所有Job執(zhí)行完成。
JobTicket
jobs::JobTicketPtr JobSystemModule::RequestTicket();令牌對象。
一般用來處理跨線程的生命周期控制。
回調(diào)之前先通過IsExpired()來判斷對應(yīng)對象是否已經(jīng)釋放。
示例代碼:
GJobSystem->Post([this, workTicket]() {if (!workTicket || workTicket->IsExpired()) return;InitInNetworkThread();},JobSystemType::kNetworkJob);
(二)asio與其他實(shí)現(xiàn)的對比
正好今年的GDC上有一個(gè)《One Frame In Halo Infinite》的分享, 里面主要講述的是對Halo Infinite的引擎升級,提供新的JobSystem和新的動(dòng)態(tài)幀的機(jī)制來支撐項(xiàng)目的,我們直接以它為例子來對比一下framework和Halo的實(shí)現(xiàn),并且也借用Halo Infinite的例子,來更好的了解這種lambda post模式的缺陷,以及可以改進(jìn)的點(diǎn)。
Halo引入新的JobSystem主要是為了將老的Tetris結(jié)構(gòu)的并發(fā)模式:

向新的基于Dependency的圖狀結(jié)構(gòu)遷移:

他使用的JobSystem的業(yè)務(wù)Api其實(shí)很簡單, 我們直接來看一下相關(guān)的代碼:
JobSystem& jobSsytem = JobSystem::Get();JobGraphHandle graphHandle = jobSystem.CreateJobGraph();JobHandle jobA = jobSystem.AddJob(graphHandle,"JobA",[](){...} );JobHandle jobB = jobSystem.AddJob(graphHandle,"JobB",[](){...} );jobSystem.AddJobToJobDependency(jobA, jobB);jobSystem.SubmitJobGraph(graphHandle);
通過這樣的機(jī)制,就很容易形成如:

另外還有一個(gè)用于同步的SyncPoint:
JobSystem& jobSystem = JobSystem::Get();JobGraphHandle graphHandle = jobSystem.CreateJobGraph();SyncPointHandle syncPointX = jobSystem.CreateSyncPoint(graphHandle, "SyncPointX");JobHandle jobA = jobSystem.AddJob(graphHandle, "JobA", [](){...});JobHandle jobB = jobSystem.AddJob(graphHandle, "JobB", [](){...});jobSystem.AddJobToSyncPointDependency(jobA, syncPointX);jobSystem.AddSyncPointToJobDependency(syncPointX, jobB);jobSystem.SubmitJobGraph(graphHandle);
大致的作用如下:

這樣在workload主動(dòng)觸發(fā)SyncPoint后,整體執(zhí)行才會(huì)繼續(xù)往下推進(jìn),這樣就能方便的加入一些主動(dòng)的同步點(diǎn)對整個(gè)Graph的執(zhí)行做相關(guān)的控制了。
回到asio,我們前面也介紹了,使用strand和post(),我們也能很方便的構(gòu)造出Graph形的執(zhí)行情況,而SyncPoint其實(shí)類型framework中提供的Event,表達(dá)上會(huì)略有差異,但很容易看出兩套實(shí)現(xiàn)其實(shí)是相當(dāng)類同的。這樣的話,Halo的JobSystem有的所有優(yōu)缺點(diǎn),framework基本也同樣存在了,這里簡單搬運(yùn)一下:

對于復(fù)雜并發(fā)業(yè)務(wù)的表達(dá)以lambda內(nèi)嵌為主,雖然這種方式盡可能保證所有代碼上下文是比較集中的,對比純粹使用callback的模式有所進(jìn)步,但這種自由度過高的方式本身也會(huì)存在一些問題,純粹靠編碼者來維系并發(fā)上下文的正確性,這種情況下狀態(tài)值在lambda之間的傳遞也需要特別的小心,容易出錯(cuò),并且難以調(diào)試。
coroutine實(shí)現(xiàn)部分
coroutine部分之前的帖子里已經(jīng)寫得比較詳細(xì)了,這里僅給出鏈接以及簡單的代碼示例:
如何在C++17中實(shí)現(xiàn)stackless coroutine以及相關(guān)的任務(wù)調(diào)度器。
C++20 Coroutine實(shí)例教學(xué)。
另外還有一個(gè)purecpp大會(huì)的演講視頻,主要內(nèi)容與上述的兩篇文章相關(guān)度比較高,這里也給出相關(guān)的鏈接,感興趣的同學(xué)可以自行觀看:C++20 coroutine原理與應(yīng)用。
代碼示例:
//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");mScheduler.CreateTask20([clientProxy]()-> rstudio::logic::CoResumingTaskCpp20 {auto* task = rco_self_task();printf("step1: task is %llu\n", task->GetId());co_await rstudio::logic::cotasks::NextFrame{};printf("step2 after yield!\n");int c = 0;while (c < 5) {printf("in while loop c=%d\n", c);co_await rstudio::logic::cotasks::Sleep(1000);c++;}for (c = 0; c < 5; c++) {printf("in for loop c=%d\n", c);co_await rstudio::logic::cotasks::NextFrame{};}printf("step3 %d\n", c);auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,[]()-> logic::CoResumingTaskCpp20 {printf("from child coroutine!\n");co_await rstudio::logic::cotasks::Sleep(2000);printf("after child coroutine sleep\n");});printf("new task create in coroutine: %llu\n", newTaskId);printf("Begin wait for task!\n");co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };printf("After wait for task!\n");rstudio::logic::cotasks::RpcRequestrpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};auto* rpcret = co_await rpcReq;if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {assert(rpcret->totalRet == 1);auto retval = rpcret->retValue.to<int>();assert(retval == 4);printf("rpc coroutine run suc, val = %d!\n", retval);}else {printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);}co_await rstudio::logic::cotasks::Sleep(5000);printf("step4, after 5s sleep\n");co_return rstudio::logic::CoNil;} );
執(zhí)行結(jié)果:
step1: task is 1step2 after yield!in while loop c=0in while loop c=1in while loop c=2in while loop c=3in while loop c=4in for loop c=0in for loop c=1in for loop c=2in for loop c=3in for loop c=4step3 5new task create in coroutine: 2Begin wait for task!from child coroutine!after child coroutine sleepAfter wait for task!service yield call finish!rpc coroutine run suc, val = 4!step4, after 5s sleep
整體來看,協(xié)程的使用還是給異步編程帶來了很多便利,但框架本身的實(shí)現(xiàn)其實(shí)還是有比較多迭代優(yōu)化的空間的:
asio的調(diào)度部分與coroutine部分的實(shí)現(xiàn)是分離的。
coroutine暫時(shí)只支持主線程。
小結(jié)
上面也結(jié)合halo的實(shí)例說到了一些限制,那么這些問題有沒有好的解決辦法了,答案是肯定的,雖然execution并未完全通過提案,但整體而言,execution新的sender/reciever模型,對于解決上面提到的一些缺陷,應(yīng)該是提供了非常好的思路,我們下一章節(jié)中繼續(xù)展開。
三、so easy-execution就是解?
最開始的想法其實(shí)比較簡單,結(jié)合原來的framework,適當(dāng)引入提案中的execution一些比較可取的思路,讓framework的異步編程能更多的吸取c++新特性和execution比較高級的框架抽象能力,提升整個(gè)異步庫的實(shí)現(xiàn)質(zhì)量。所以最開始定的主線思路其實(shí)是更多的向execution傾斜,怎么了解掌握execution,怎么與現(xiàn)在的framework結(jié)合成了主線思路。
我們選擇的基礎(chǔ)參考庫是來自沖元宇宙這波改名的Meta公司的libunifex,客觀來說,Meta公司的folly庫,以及l(fā)ibunifex庫的實(shí)現(xiàn)質(zhì)量,肯定都是業(yè)界前沿的,對c++新特性的使用和探索,也是相當(dāng)給力的。這些我們后續(xù)在分析libunifex具體實(shí)現(xiàn)的篇章中也能實(shí)際感受到。但深入了解libunifex后,我們會(huì)發(fā)現(xiàn),它的優(yōu)點(diǎn)有不少:
嘗試為c++提供表達(dá)異步的框架性結(jié)構(gòu)。
泛型用得出神入化,ponder在它前面基本是小弟級別的,一系列泛用性特別強(qiáng)的template 編程示例,比如隱含在sender/receiver思路內(nèi)的lazy evaluate表達(dá),如何在大量使用泛型的情況下提供業(yè)務(wù)定制點(diǎn)等等。
結(jié)構(gòu)化的表達(dá)并發(fā)和異步,相關(guān)代碼的編寫從自由發(fā)揮自主把控走向框架化,約束化,能夠更有序更可靠的表達(dá)復(fù)雜異步邏輯。
整個(gè)執(zhí)行pipeline的組織,所有信息是compile time和runtime完備的,dependencies不會(huì)丟失。
節(jié)點(diǎn)之間的值類型是強(qiáng)制檢查的,有問題的情況,大多時(shí)候compiler time就會(huì)報(bào)錯(cuò)。
有不少優(yōu)點(diǎn)的同時(shí),也有很多缺點(diǎn):
整個(gè)庫的實(shí)現(xiàn)嚴(yán)重依賴了c++20 ranges采用的一種定制手段cpo,并且也使用了類似ranges的pipe表達(dá)方法,理解相關(guān)代碼存在一定的門檻(后續(xù)會(huì)有具體的篇章展開相關(guān)的內(nèi)容)
庫同時(shí)向下兼容了c++17,但由于c++17本身特性的限制,引入了大量的宏,以及X Macros展開的方式,導(dǎo)致相關(guān)的代碼閱讀難度進(jìn)一步提升。但實(shí)際上c++17版本并不具備可維護(hù)的價(jià)值,依賴SIFINAE的實(shí)現(xiàn),如果中間任何一環(huán)報(bào)錯(cuò),必然需要在N屏的報(bào)錯(cuò)中尋找有效信息。
libunifex對coroutine的支持存疑,雖然讓coroutine可以作為一種reciever存在,但本質(zhì)上來說,coroutine其實(shí)更適合拿來做流程控制的膠水,而不是作為異步中的某個(gè)節(jié)點(diǎn)存在。
默認(rèn)的scheduler實(shí)現(xiàn)質(zhì)量離工業(yè)級還存在一定的距離,這一點(diǎn)后續(xù)的代碼分析中也會(huì)具體提到。
諸多問題的存在,可能也是execution提案沒有短時(shí)間內(nèi)獲得通過的原因吧。但整體來說,execution本身的理念還是很有參考價(jià)值的,但以它的現(xiàn)狀來說,離最終的解肯定還是有比較大的距離的。
四、嘗試重新思考-要什么,用什么
事情到這個(gè)點(diǎn)就有點(diǎn)尷尬了,原有的asio,架構(gòu)層面來說,跟新的execution是存在落差的。而項(xiàng)目實(shí)踐上來說,asio相當(dāng)穩(wěn)扎穩(wěn)打,而以libunifex當(dāng)前的狀態(tài)來說,離工業(yè)化使用其實(shí)是有一定距離的。但asio作者在21年時(shí)候的兩篇演講(更像coding show):
Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming
Talking Async Ep2: Cancellation in depth
第一篇基本整個(gè)演示了asio從最開始的callback,到融入c++20 coroutine后的優(yōu)雅異步表達(dá),我們可以通過下面的代碼片斷感受一下:
asio相關(guān)示例代碼1
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target){for (;;){auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);if (e)break;auto ex = client.get_executor();co_spawn(ex, proxy(std::move(client), target), detached);}}
asio相關(guān)示例代碼2
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);if (!e){co_await ((transfer(client, server, client_to_server_deadline) ||watchdog(client_to_server_deadline))&&(transfer(server, client, server_to_client_deadline) ||watchdog(server_to_client_deadline)));}
對比原來每個(gè)async_xxx()函數(shù)后接callback的模式,整個(gè)實(shí)現(xiàn)可以說是相當(dāng)?shù)膬?yōu)雅了,代碼的可讀性也得到了極大的提高,這兩段代碼都來自于上面的演講中,想深入了解的可以直接打開相關(guān)的鏈接觀看視頻,很推薦大家去看一下。
能夠把復(fù)雜的事情用更簡潔易懂的方法表達(dá),這肯定是讓人振奮的,當(dāng)然,深入了解相關(guān)實(shí)現(xiàn)后,也會(huì)發(fā)現(xiàn)存在一些問題,但我們的本意是參考學(xué)習(xí),得出最終想要的可以比較好的支撐并發(fā)和異步業(yè)務(wù)的基礎(chǔ)框架,有這些,其實(shí)已經(jīng)可以理出一條比較清晰的思路了:
execution部分主要使用它的sender/receiver概念,和它提供的一些通用的算法。移除掉所有因?yàn)閒allback c++17引入的大量代碼噪聲。拋棄它并不完備的各種scheduler實(shí)現(xiàn)。
協(xié)程借鑒部分asio的思路,首先讓協(xié)程可以基于context上下文,在跨線程的情況下使用,另外更多還是使用原有框架有明確的scheduler的方式對所有協(xié)程進(jìn)行管理和定制的模式。
使用asio的scheduler部分作為execution的底層scheduler實(shí)現(xiàn),同時(shí)也使用asio的timer表達(dá),去除原始libunifex依賴不同scheduler提供schedule_at()方法來執(zhí)行定時(shí)器相關(guān)邏輯的實(shí)現(xiàn)。
根據(jù)業(yè)務(wù)需要,定制一些必要的sender adapter等簡化業(yè)務(wù)的使用。
嘗試用execution框架對接ISPC等特殊的并發(fā)庫,能夠以一個(gè)清晰的方式來表達(dá)這種混合環(huán)境上執(zhí)行的邏輯。
本系列涉及的基礎(chǔ)知識和相關(guān)內(nèi)容比較多,后續(xù)慢慢展開。目前的思路是先介紹大家相對熟悉度不那么高的execution基礎(chǔ)知識和libunifex,后面再介紹asio相關(guān)的scheduler以及coroutine實(shí)現(xiàn),最后再回歸筆者正在迭代的framework,這樣一個(gè)順序來展開。
作者簡介
沈芳
騰訊后臺(tái)開發(fā)工程師
IEG研發(fā)效能部開發(fā)人員,畢業(yè)于華中科技大學(xué)。目前負(fù)責(zé)CrossEngine Server的開發(fā)工作,對GamePlay技術(shù)比較感興趣。
推薦閱讀
C++反射:深入探究function實(shí)現(xiàn)機(jī)制!

