C++異步從理論到實踐總覽篇

作者:fangshen,騰訊 IEG 客戶端開發(fā)工程師
C++20帶來了coroutine特性, 同時新的execution也在提案過程中, 這兩者都給我們在C++中解決異步問題帶來了新的思路. 但對比其他語言的實現(xiàn), C++的協(xié)程和后續(xù)的execution都存在一定的理解和封裝成本, 本系列的分享我們將圍繞基本的原理, 相應的封裝, 以及剝析優(yōu)秀的第三方實現(xiàn), 最終結合筆者framework落地的情況來展開.
1. 糾結的開篇
之前設計我們游戲用的c++框架的時候, 剛好c++20的coroutine已經(jīng)發(fā)布, 又因為是專門 給game server用的c++ framework, 對多線程的訴求相對有限, 或者本著少并發(fā)少奇怪的錯誤的原則, 除網(wǎng)絡和IO和日志等少量模塊外, 大部分模塊主要還是工作在主線程上的, 所以當時設計的重點也就放在了c++20 coroutine的包裝和使用上, 更多的使用coroutine來完善異步的支持. 但如果考慮到framework作為前后端公用框架的話, 原來主要針對主線程使用的包裝的coroutine調(diào)度器就顯得有些不夠用, 以此作為基礎, 我們開始了嘗試結合比較新的c++異步思路, 來重新思考應該如何實現(xiàn)一個盡量利用c++新特性, 業(yè)務層簡單易用的異步框架了.
本系列的主要內(nèi)容也是圍繞這條主線來鋪開, 過程中我們 主要以:
自有的framework異步實現(xiàn) - 主要落地嘗試利用c++20的coroutine實現(xiàn)一個業(yè)務級的調(diào)度器. asio - 這個應該不用多說了, 近年來一直高頻迭代, 業(yè)界廣泛使用的開源第三方庫, 中間的異步任務調(diào)度, 網(wǎng)絡部分的代碼實現(xiàn)都非常優(yōu)質(zhì). libunifex - 最接近當前sender/receiver版 execution提案的可實操版本, c++17/20兼容, 但不推薦使用c++17的版本進行任何嘗試, 原因后續(xù)文件會展開.
這幾個庫作為基礎, 逐步展開我們對c++異步的探索, 然后再回到落地實踐這條主線上, 探討一個業(yè)務側(cè)使用簡單, 內(nèi)部高效的異步庫應該如何來實現(xiàn)并落地. 當然, 我們的側(cè)重點主要還是c++異步的調(diào)度和處理上, 網(wǎng)絡相關的有部分內(nèi)容可能會簡單提到, 但不會進行深入的展開. ??其實整個嘗試的過程只能說非常不順利了, 當然, 隨著對相關實現(xiàn)的深入理解和細節(jié)的深挖, 收益也是頗多的. 閑話不多說了, 我們直接切入主題, 以對異步的思考來展開這篇總覽的內(nèi)容.
2. 前塵往事 - rstudio framework實現(xiàn)
rstudio framework的異步框架由兩塊比較獨立的部分組成:
一部分是源自asio幾年前版本的post和strand部分實現(xiàn), 另外附加了一些業(yè)務側(cè)較常用的像Fence等對象; 另外一部分是主線程的協(xié)程調(diào)度器實現(xiàn), 這部分最早是基于c++17實現(xiàn)的一版stackless 協(xié)程; 另外一版則是gcc11.1正式發(fā)布后, 直接用c++20重構了整個實現(xiàn), 直接使用c++20的coroutine的一個版本.
2.1 asio 部分
??這一部分的內(nèi)容因為后續(xù)有asio scheduler實現(xiàn)具體的分析篇章, 這個地方主要以業(yè)務側(cè)使用進行展開了.
2.1.1 executor概述
來源于1.6X boost同期的asio standalone版本 去除了各平臺網(wǎng)絡處理相關的代碼 僅保留了post和相關的功能(新版本有executor實現(xiàn)) 早期c++11兼容, 無coroutine支持 除網(wǎng)絡庫外, asio非常有使用價值的一部分代碼
2.1.2 一個簡單的使用示例
GJobSystem->Post([]() {
//some calculate task here
//...
GJobSystem->Post(
[]() {
//task notify code here
//...
},
rstudio::JobSystemType::kLogicJob);
}, rstudio::JobSystemType::kWorkJob);
相關的時序圖:

2.1.3 當前框架使用的線程結構

預定義的枚舉值:
enum class JobSystemType : int {
kLogicJob = 0, // logic thread(main thread)
kWorkJob, // work thread
kSlowJob, // slow work thread(run io or other slow job)
kNetworkJob, // add a separate thread for network
kNetworkConnectJob, // extra connect thread for network
kLogJob, // log thread
kNotifyExternalJob, // use external process to report something, 1 thread only~~
kTotalJobTypes,
};
不同Job說明:
kLogicJob
主線程(邏輯線程)執(zhí)行任務 kWorkJob
Work Thread線程池執(zhí)行任務(多個), 一般是計算量可控的小任務 kSlowJob
IO專用線程池, IO相關的任務投遞到本線程池 kNetworkJob
目前tbuspp專用的處理線程 kNetworkConnectJob
專用的網(wǎng)絡連接線程, tbuspp模式下不需要 kLogJob
日志專用線程, 目前日志模塊是自己起的線程, 可以歸并到此處管理 kNotifyExternalJob
專用的通知線程, 如lua error的上報, 使用該類型
2.1.4 Timer任務相關
相關接口:
//NoIgnore version
uint64_t JobSystemModule::AddAlwaysRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long periodTimeMs);
uint64_t JobSystemModule::AddTimesRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long periodTimeMs,
unsigned int runCount);
uint64_t JobSystemModule::AddDelayRunJob(JobSystemType jobType,
threads::ThreadJobFunction&& periodJob,
unsigned long delayTimeMs);
void JobSystemModule::KillTimerJob(uint64_t tid);
本部分并未直接使用asio原始的basic_waitable_timer實現(xiàn), 而是自己實現(xiàn)的定時任務.
2.1.5 在線程池上關聯(lián)執(zhí)行任務 - Strand
特定的情況下, 被派發(fā)到Work線程池的任務存在依賴關系
需要串聯(lián)執(zhí)行的時候, 這個時候我們需要額外的設施 JobStrand
來保證任務是按先后依賴關系來串行執(zhí)行的
如下圖中part1, part2, part3, part4串行執(zhí)行的情況所示

示例代碼:
auto strand = GJobSystem->RequestStrand(rstudio::JobSystemType::kWorkJob);
starnd.Post([](){
//part1~
// ...
});
starnd.Post([](){
//part2~
// ...
});
starnd.Post([](){
//part3~
// ...
});
starnd.Post([](){
//part4~
// ...
});
starnd.Post([](){
GJobSystem->Post([](){
//return code here
// ...
}, rstudio::JobSystemType::kLogicJob);
});
2.1.6 其他輔助設施
JobFence
jobs::JobFencePtr JobSystemModule::RequestFence();
字面義, 柵欄, 起到攔截執(zhí)行的作用. 一般多用于模塊的初始化和結束 如tbuspp在kNetworkJob上的初始化和結束.

示例代碼(TcpService的初始化):
job_system_module_->Post(
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return;
InitInNetworkThread();
},
JobSystemType::kNetworkJob);
period_task_ptr = job_system_module_->AddAlwaysRunJob(
JobSystemType::kNetworkJob,
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return;
LoopInNetworkThread();
},
10);
fence_->FenceTo((int)JobSystemType::kNetworkJob);
fence_->Wait();
JobNotify && JobWaiter
jobs::JobWaiterPtr JobSystemModule::RequestWaiter();
jobs::JobNotifyPtr JobSystemModule::RequestNotify();
批量任務管理使用 等待的方式的區(qū)別 JobNotify: 執(zhí)行完成調(diào)用額外指定的回調(diào). JobWaiter: 以Wait的方式在特定線程等待所有Job執(zhí)行完成.
JobTicket
jobs::JobTicketPtr JobSystemModule::RequestTicket();
令牌對象 一般用來處理跨線程的生命周期控制 回調(diào)之前先通過IsExpired()來判斷對應對象是否已經(jīng)釋放
示例代碼:
GJobSystem->Post(
[this, workTicket]() {
if (!workTicket || workTicket->IsExpired()) return;
InitInNetworkThread();
},
JobSystemType::kNetworkJob);
2.2 asio 與其他實現(xiàn)的對比
??正好今年的GDC上有一個<<One Frame In Halo Infinite>>的分享, 里面主要講述的是對Halo Infinite的引擎升級, 提供新的JobSystem和新的動態(tài)幀的機制來支撐項目的, 我們直接以它為例子來對比一下framework和Halo的實現(xiàn), 并且也借用Halo Infinite的例子, 來更好的了解這種lambda post模式的缺陷, 以及可以改進的點. ??Halo引入新的JobSystem主要是為了將老的Tetris結構的并發(fā)模式:

向新的基于Dependency的圖狀結構遷移:
他使用的JobSystem的業(yè)務Api其實很簡單, 我們直接來看一下相關的代碼:
JobSystem& jobSsytem = JobSystem::Get();
JobGraphHandle graphHandle = jobSystem.CreateJobGraph();
JobHandle jobA = jobSystem.AddJob(
graphHandle,
"JobA",
[](){...} );
JobHandle jobB = jobSystem.AddJob(
graphHandle,
"JobB",
[](){...} );
jobSystem.AddJobToJobDependency(jobA, jobB);
jobSystem.SubmitJobGraph(graphHandle);
通過這樣的機制, 就很容易形成如:
另外還有一個用于同步的SyncPoint:
JobSystem& jobSystem = JobSystem::Get();
JobGraphHandle graphHandle = jobSystem.CreateJobGraph();
SyncPointHandle syncPointX = jobSystem.CreateSyncPoint(graphHandle, "SyncPointX");
JobHandle jobA = jobSystem.AddJob(graphHandle, "JobA", [](){...});
JobHandle jobB = jobSystem.AddJob(graphHandle, "JobB", [](){...});
jobSystem.AddJobToSyncPointDependency(jobA, syncPointX);
jobSystem.AddSyncPointToJobDependency(syncPointX, jobB);
jobSystem.SubmitJobGraph(graphHandle);
大致的作用如下:

這樣在workload主動觸發(fā)SyncPoint后, 整體執(zhí)行才會繼續(xù)往下推進, 這樣就能方便的加入一些主動的同步點對整個Graph的執(zhí)行做相關的控制了。
回到asio, 我們前面也介紹了, 使用strand和post(), 我們也能很方便的構造出Graph形的執(zhí)行情況 , 而SyncPoint其實類型framework中提供的Event, 表達上會略有差異, 但很容易看出兩套實現(xiàn)其實是相當類同的. 這樣的話, Halo 的JobSystem有的所有優(yōu)缺點, framework基本也同樣存在了, 這里簡單搬運一下:

對于復雜并發(fā)業(yè)務的表達以lambda內(nèi)嵌為主, 雖然這種方式盡可能保證所有代碼上下文是比較集中的, 對比純粹使用callback的模式有所進步, 但這種自由度過高的方式本身也會存在一些問題, 純粹靠編碼者來維系并發(fā)上下文的正確性, 這種情況下狀態(tài)值在lambda之間的傳遞也需要特別的小心, 容易出錯, 并且難以調(diào)試。
2.3 coroutine實現(xiàn)部分
coroutine部分之前的帖子里已經(jīng)寫得比較詳細了, 這里僅給出鏈接以及簡單的代碼示例:
如何在C++17中實現(xiàn)stackless coroutine以及相關的任務調(diào)度器 C++20 Coroutine實例教學 另外還有一個purecpp大會的演講視頻, 主要內(nèi)容與上述的兩篇文章相關度比較高, 這里也給出相關的鏈接, 感興趣的同學可以自行觀看:C++20 coroutine原理與應用
代碼示例:
//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
-> rstudio::logic::CoResumingTaskCpp20 {
auto* task = rco_self_task();
printf("step1: task is %llu\n", task->GetId());
co_await rstudio::logic::cotasks::NextFrame{};
printf("step2 after yield!\n");
int c = 0;
while (c < 5) {
printf("in while loop c=%d\n", c);
co_await rstudio::logic::cotasks::Sleep(1000);
c++;
}
for (c = 0; c < 5; c++) {
printf("in for loop c=%d\n", c);
co_await rstudio::logic::cotasks::NextFrame{};
}
printf("step3 %d\n", c);
auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
[]()-> logic::CoResumingTaskCpp20 {
printf("from child coroutine!\n");
co_await rstudio::logic::cotasks::Sleep(2000);
printf("after child coroutine sleep\n");
});
printf("new task create in coroutine: %llu\n", newTaskId);
printf("Begin wait for task!\n");
co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
printf("After wait for task!\n");
rstudio::logic::cotasks::RpcRequest
rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
auto* rpcret = co_await rpcReq;
if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
assert(rpcret->totalRet == 1);
auto retval = rpcret->retValue.to<int>();
assert(retval == 4);
printf("rpc coroutine run suc, val = %d!\n", retval);
}
else {
printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);
}
co_await rstudio::logic::cotasks::Sleep(5000);
printf("step4, after 5s sleep\n");
co_return rstudio::logic::CoNil;
} );
執(zhí)行結果:
step1: task is 1
step2 after yield!
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc, val = 4!
step4, after 5s sleep
整體來看, 協(xié)程的使用還是給異步編程帶來了很多便利, 但框架本身的實現(xiàn)其實還是有比較多迭代優(yōu)化的空間的:
asio的調(diào)度部分與coroutine部分的實現(xiàn)是分離的 coroutine暫時只支持主線程
2.4 小結
上面也結合halo的實例說到了一些限制, 那么這些問題有沒有好的解決辦法了, 答案是肯定的, 雖然execution并未完全通過提案, 但整體而言, execution新的sender/reciever模型, 對于解決上面提到的一些缺陷, 應該是提供了非常好的思路, 我們下一章節(jié)中繼續(xù)展開.
3. so easy - execution就是解?
最開始的想法其實比較簡單, 結合原來的framework, 適當引入提案中的execution一些比較可取的思路, 讓framework的異步編程能更多的吸取c++新特性和execution比較高級的框架抽象能力, 提升整個異步庫的實現(xiàn)質(zhì)量. 所以最開始定的主線思路其實是更多的向execution傾斜, 怎么了解掌握execution, 怎么與現(xiàn)在的framework結合成了主線思路.
我們選擇的基礎參考庫是來自沖元宇宙這波改名的Meta公司的libunifex, 客觀來說, Meta公司的folly庫, 以及l(fā)ibunifex庫的實現(xiàn)質(zhì)量, 肯定都是業(yè)界前沿的, 對c++新特性的使用和探索, 也是相當給力的. 這些我們后續(xù)在分析libunifex具體實現(xiàn)的篇章中也能實際感受到.
但深入了解libunifex后, 我們會發(fā)現(xiàn), 它的優(yōu)點有不少:
嘗試為c++提供表達異步的框架性結構. 泛型用得出神入化, ponder在它前面基本是小弟級別的, 一系列泛用性特別強的template 編程示例, 比如隱含在sender/receiver思路內(nèi)的lazy evaluate表達, 如何在大量使用泛型的情況下提供業(yè)務定制點等等. 結構化的表達并發(fā)和異步, 相關代碼的編寫從自由發(fā)揮自主把控走向框架化, 約束化, 能夠更有序更可靠的表達復雜異步邏輯 整個執(zhí)行pipeline的組織, 所有信息是compile time和runtime完備的, dependencies不會丟失. 節(jié)點之間的值類型是強制檢查的, 有問題的情況 , 大多時候compiler time就會報錯. 有不少優(yōu)點的同時, 也有很多缺點: 整個庫的實現(xiàn)嚴重依賴了c++20 ranges采用的一種定制手段 cpo, 并且也使用了類似ranges的pipe表達方法, 理解相關代碼存在一定的門坎.(后續(xù)會有具體的篇章展開相關的內(nèi)容) 庫同時向下兼容了c++17, 但由于c++17本身特性的限制, 引入了大量的宏, 以及X Macros展開的方式, 導致相關的代碼閱讀難度進一步提升. 但實際上c++17版本并不具備可維護的價值, 依賴SIFINAE的實現(xiàn), 如果中間任何一環(huán)報錯, 必然需要在N屏的報錯中尋找有效信息. libunifex對coroutine的支持存疑, 雖然讓coroutine可以作為一種reciever存在, 但本質(zhì)上來說, coroutine其實更適合拿來做流程控制的膠水, 而不是作為異步中的某個節(jié)點存在. 默認的scheduler實現(xiàn)質(zhì)量離工業(yè)級還存在一定的距離, 這一點后續(xù)的代碼分析中也會具體提到. 諸多問題的存在, 可能也是execution提案沒有短時間內(nèi)獲得通過的原因吧, 但整體來說, execution本身的理念還是很有參考價值的, 但以它的現(xiàn)狀來說, 離最終的解肯定還是有比較大的距離的.
4. 嘗試重新思考 - 要什么, 用什么
事情到這個點就有點尷尬了, 原有的asio, 架構層面來說, 跟新的execution是存在落差的. 而項目實踐上來說, asio相當穩(wěn)扎穩(wěn)打, 而以libunifex當前的狀態(tài)來說, 離工業(yè)化使用其實是有一定距離的. 但asio作者在21年時候的兩篇演講(更像coding show):
Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming Talking Async Ep2: Cancellation in depth第一篇基本整個演示了asio從最開始的callback, 到融入c++20 coroutine后的優(yōu)雅異步表達, 我們可以通過下面的代碼片斷感受一下:
asio相關示例代碼1
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
for (;;)
{
auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
if (e)
break;
auto ex = client.get_executor();
co_spawn(ex, proxy(std::move(client), target), detached);
}
}
asio相關示例代碼2
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e)
{
co_await (
(
transfer(client, server, client_to_server_deadline) ||
watchdog(client_to_server_deadline)
)
&&
(
transfer(server, client, server_to_client_deadline) ||
watchdog(server_to_client_deadline)
)
);
}
對比原來每個async_xxx()函數(shù)后接callback的模式, 整個實現(xiàn)可以說是相當?shù)膬?yōu)雅了, 代碼的可讀性也得到了極大的提高, 這兩段代碼都來自于上面的演講中, 想深入了解的可以直接打開相關的鏈接觀看視頻, 很推薦大家去看一下. ??能夠把復雜的事情用更簡潔易懂的方法表達, 這肯定是讓人振奮的, 當然, 深入了解相關實現(xiàn)后, 也會發(fā)現(xiàn)存在一些問題, 但我們的本意是參考學習, 得出最終想要的可以比較好的支撐并發(fā)和異步業(yè)務的基礎框架, 有這些, 其實已經(jīng)可以理出一條比較清晰的思路了:
execution部分主要使用它的sender/receiver概念, 和它提供的一些通用的算法. 移除掉所有因為fallback c++17引入的大量代碼噪聲. 拋棄它并不完備的各種scheduler實現(xiàn) 協(xié)程借鑒部分asio的思路, 首先讓協(xié)程可以基于context上下文, 在跨線程的情況下使用, 另外更多還是使用原有框架有明確的scheduler的方式對所有協(xié)程進行管理和定制的模式. 使用asio的scheduler部分作為execution的底層scheduler實現(xiàn), 同時也使用asio的timer表達, 去除原始libunifex依賴不同scheduler提供schedule_at()方法來執(zhí)行定時器相關邏輯的實現(xiàn). 根據(jù)業(yè)務需要, 定制一些必要的sender adapter等簡化業(yè)務的使用. 嘗試用execution框架對接ISPC等特殊的并發(fā)庫, 能夠以一個清晰的方式來表達這種混合環(huán)境上執(zhí)行的邏輯.
本系列涉及的基礎知識和相關內(nèi)容比較多, 先給出一個臨時的大綱, 后續(xù)可能會有調(diào)整. 目前的思路是先介紹大家相對熟悉度不那么高的execution基礎知識和libunifex, 后面再介紹asio相關的scheduler以及coroutine實現(xiàn), 最后再回歸筆者正在迭代的framework, 這樣一個順序來展開.
參考
One Frame in Halo Infinite asio官網(wǎng) libunifex源碼庫
