C++異步:libunifex的scheduler實(shí)現(xiàn)!

導(dǎo)語 | 本篇我們將集中介紹在cpu thread類型的execution context,不涉及異構(gòu)的execution context實(shí)現(xiàn)和調(diào)度。
前篇《C++異步:libunifex中的concepts詳解!》中我們相對深入的介紹了libunifex中的concepts的方方面面,對execution的整體設(shè)計(jì)框架有了一個(gè)基礎(chǔ)的認(rèn)知,本篇我們將繼續(xù)介紹作為execution執(zhí)行基石的scheduler的實(shí)現(xiàn)細(xì)節(jié)。本篇的介紹集中在cpu thread類型的execution context上,不涉及異構(gòu)的execution context實(shí)現(xiàn)和調(diào)度。
一、scheduler的實(shí)現(xiàn)概述
我們先來回顧一下scheduler在execution整體設(shè)計(jì)中的位置和作用:

libunifex中的Scheduler其實(shí)就是一個(gè)輕量的Wrapper,真正負(fù)責(zé)異步任務(wù)執(zhí)行的是底層的Execution Context實(shí)現(xiàn)。對于非異構(gòu)的實(shí)現(xiàn), 這里的Execution Context一般代表一個(gè)Cpu線程或者一組Cpu線程(線程池),最簡單的情況是相關(guān)的任務(wù)被投遞到一個(gè)線程上來執(zhí)行。我們會(huì)通過Scheduler對相關(guān)的Execution Context再包裝一次,pipeline組織的過程中將只涉及Scheduler,但Scheduler內(nèi)部一般都會(huì)包含相關(guān)Exectuion Context的包裝與實(shí)現(xiàn)。
未做任何加工的情況,我們能夠想象,所有事情都將一口氣在Assembly Thread上發(fā)生完畢,那如果我們要實(shí)現(xiàn)將異步操作調(diào)度到工作線程上執(zhí)行,應(yīng)該如何實(shí)現(xiàn)呢?這里我們直接以libunifex中比較常用的manual_event_loop實(shí)現(xiàn)來說明整個(gè)實(shí)現(xiàn)邏輯:

直接看圖整個(gè)實(shí)現(xiàn)還是比較復(fù)雜的,但其實(shí)最重要的地方只有幾處:
context實(shí)現(xiàn)-Work Thread本身的工作機(jī)制,它應(yīng)該是能夠主動(dòng)執(zhí)行自身包含的任務(wù)隊(duì)列的。
與execution橋接-Work Thread提供機(jī)制,允許其它線程向自己插入待執(zhí)行的任務(wù),并且我們需要將相關(guān)的任務(wù)包裝為符合exection設(shè)計(jì)的形態(tài)。
context與物理線程關(guān)聯(lián)-我們最后肯定需要將context的運(yùn)行與一個(gè)具體的物理線程關(guān)聯(lián)起來,這樣context才能不斷的執(zhí)行投遞到其中的task。做到這幾點(diǎn),整個(gè)異步操作的執(zhí)行就自然的轉(zhuǎn)移到Work Thread了。
下文我們將結(jié)合具體的代碼實(shí)現(xiàn)來分析這兩點(diǎn)是怎么達(dá)成的。
二、context實(shí)現(xiàn)分析
manual_event_loop版
manual_event_loop版的context實(shí)現(xiàn)比較簡潔。它的默認(rèn)調(diào)度器實(shí)現(xiàn)的核心實(shí)現(xiàn)位于manual_event_loop.h&與大部分調(diào)度器實(shí)現(xiàn)類似,它采用context與task的相關(guān)抽象與線程是剝離的方式,主要完成兩部分的功能:
實(shí)現(xiàn)一個(gè)標(biāo)準(zhǔn)的任務(wù)管理器,有標(biāo)準(zhǔn)的任務(wù)加入和執(zhí)行接口。
完成與execution系統(tǒng)的橋接,這部分主要是由scheduler實(shí)現(xiàn)來完成的。??
下面我們分開來看一下這兩部分的實(shí)現(xiàn):
(一)context與task_base的實(shí)現(xiàn)
一個(gè)標(biāo)準(zhǔn)的任務(wù)管理器在很多地方我們可能都會(huì)用到,比如libunifex中的異步任務(wù)調(diào)度,比如一些定時(shí)器的實(shí)現(xiàn)。相關(guān)的代碼實(shí)現(xiàn)一般都比較簡單,libunifex的實(shí)現(xiàn)也同樣如此,相關(guān)的概覽圖如下:

核心代碼如下:task_base:
struct manual_event_loop::task_base {using execute_fn = void(task_base*) noexcept;explicit task_base(execute_fn* execute) noexcept: execute_(execute){}void execute() noexcept {this->execute_(this);}task_base* next_ = nullptr;execute_fn* execute_;};
context:
class manual_event_loop::context {void run() {std::unique_lock lock{mutex_};while (true) {while (head_ == nullptr) {if (stop_) return;cv_.wait(lock);}auto* task = head_;head_ = task->next_;if (head_ == nullptr) {tail_ = nullptr;}lock.unlock();task->execute();lock.lock();}}void stop() {std::unique_lock lock{mutex_};stop_ = true;cv_.notify_all();}private:void enqueue(task_base* task) {std::unique_lock lock{mutex_};if (head_ == nullptr) {head_ = task;} else {tail_->next_ = task;}tail_ = task;task->next_ = nullptr;cv_.notify_one();}std::mutex mutex_;std::condition_variable cv_;task_base* head_ = nullptr;task_base* tail_ = nullptr;bool stop_ = false;};
上述利用鏈表實(shí)現(xiàn)了一個(gè)FIFO的task隊(duì)列,然后提供了一個(gè)enque()接口用于推送新任務(wù)到context,一個(gè)run()接口用于執(zhí)行整個(gè)任務(wù)隊(duì)列。核心代碼使用了一個(gè)std::mutex和std::condition_variable,模擬了一個(gè)類似semaphore的作用,這樣我們跨線程的執(zhí)行run(),enque(),核心代碼本身就是適配多線程的。代碼細(xì)節(jié)我們不一一展開了,相關(guān)的實(shí)現(xiàn)直接看代碼理解更方便。同時(shí)有心的讀者可能會(huì)注意到,libunifex的任務(wù)管理器實(shí)現(xiàn)用的是raw pointer的task_base,這樣會(huì)不會(huì)出現(xiàn)memory leak相關(guān)的問題呢?這部分的答案我們在下一章節(jié)解釋。
從上面的代碼中可以看到,libunifex的實(shí)現(xiàn)比較多的依賴臨時(shí)命名空間(以'_'打頭的命名空間),然后再通過using給出外部的使用名稱。所以在代碼里我們可能會(huì)看到不同臨時(shí)命名空間下的很多context實(shí)現(xiàn),注意區(qū)分好相關(guān)的命名空間,同名的context一般都是類同的實(shí)現(xiàn),習(xí)慣了理解相關(guān)代碼也是比較方便的。
三、與execution的橋接
區(qū)別于其他的task處理框架,如asio的lambda post模式,execution框架是通過泛型的connect()和start()來完成對sender和receiver的連接和使用的,libunifex關(guān)聯(lián)context&task_base與execution框架其他部分的方式也沿續(xù)了這種思路。下面我們通過具體的代碼來看libunifex是如何通過對task_base泛型的支持,以及特定的connect() + start()的實(shí)現(xiàn),來完成相關(guān)的橋接的。tast_base的泛型實(shí)現(xiàn):我們先來看一下task_base的泛型實(shí)現(xiàn),其實(shí)也就是scheduler產(chǎn)生的sender和任意receiver執(zhí)行connect()操作后產(chǎn)生的OperationState:
template <typename Receiver>class operation final : task_base {public:template <typename Receiver2>explicit type(Receiver2&& receiver, context* loop): task_base(&type::execute_impl), receiver_((Receiver2 &&) receiver), loop_(loop) {}void start() noexcept{loop_->enqueue(this);}private:static void execute_impl(task_base* t) noexcept {auto& self = *static_cast<type*>(t);execution::set_value(std::move(self.receiver_));// ... some code ignore here}Receiver receiver_;context* const loop_;};
可以看到,libunifex很巧妙的實(shí)現(xiàn)了一個(gè)繼承自task_base的operation類,比較重要的是兩點(diǎn):
在該類構(gòu)造的時(shí)候,會(huì)將最終處理receiver通知的execute_impl關(guān)聯(lián)為task的execute()的目標(biāo)函數(shù)。
該op類的start()方法,會(huì)將自己通過context::enqueue()加入到context的任務(wù)隊(duì)列中等待執(zhí)行通過這兩步,我們完成了execution與一個(gè)任務(wù)調(diào)度器結(jié)合的絕大部分工作,當(dāng)然,還差了connect()相關(guān)的處理代碼,這部分是由剩下的scheduler_task與scheduler來共同完成的。
此處的context由于最后的using,外部的使用名稱是manual_event_loop,這個(gè)我們需要注意一下,下文中會(huì)直接使用到這個(gè)名稱。
注意此處內(nèi)嵌類_op::type的定義方式,先是前置聲明了type,后續(xù)直接通過class _op::type來定義相關(guān)的類,而不是直接在_op類中直接使用內(nèi)嵌的方式來調(diào)用,這對于代碼閱讀是有利的,避免內(nèi)嵌類層級化后導(dǎo)致的代碼理解成本增加。
scheduler_task與scheduler:?在前文《C++異步:structured concurrency實(shí)現(xiàn)解析!》中我們介紹了Sender Factory,schedule()其實(shí)也是一個(gè)Sender Factory,而它產(chǎn)生的scheduler_task其實(shí)就是一個(gè)sender,所以它包含兩部分sender的實(shí)現(xiàn):
sender traits需要用到的類型定義,如value_types和error_types,這個(gè)地方都是void,因?yàn)樗兇饩褪且粋€(gè)時(shí)機(jī)控制節(jié)點(diǎn),不會(huì)向后續(xù)節(jié)點(diǎn)傳遞任何值。
connect()成員函數(shù),可以很明顯的看到此處只是簡單的返回前面我們介紹的繼承自task_base的operation<Receiver>。
scheduler_task是scheduler的內(nèi)嵌類,此處為了方便閱讀我們將scheduler_task外置:
class schedule_task {public:template </*...*/>using value_types = /*unspecified*/;template </*...*/>using error_types = Variant<>;static constexpr bool sends_done = true;template <typename Receiver>operation<Receiver> connect(Receiver&& receiver) const {return operation<Receiver>{(Receiver &&) receiver, loop_};}private:friend scheduler;explicit schedule_task(context* loop) noexcept: loop_(loop){}context* const loop_;};
class scheduler {explicit scheduler(context* loop) noexcept : loop_(loop) {}public:schedule_task schedule() const noexcept {return schedule_task{loop_};}friend bool operator==(scheduler a, scheduler b) noexcept {return a.loop_ == b.loop_;}friend bool operator!=(scheduler a, scheduler b) noexcept {return a.loop_ != b.loop_;}private:context* loop_;};
另外回到上一節(jié)中我們提到的問題,context的整個(gè)實(shí)現(xiàn)是使用的raw pointer的task,并沒有對task的生命周期做管理,而此處構(gòu)造的task對象,也是棧上的值類型,這是因?yàn)檎麄€(gè)execution機(jī)制,都是依賴于前面提到過的operation state本身來保證生命周期的,對于此處來說,connect()的時(shí)候產(chǎn)生的棧對象operation<Receiver>,結(jié)束信號set_value調(diào)用完成前,也就是我們真正執(zhí)行到operation<Receiver>::execute()的時(shí)候,它都是有效的。
這也就巧妙的保證了,雖然我們context的實(shí)現(xiàn)使用的是raw pointer,并沒有對task的生命周期進(jìn)行管理,但它的生命周期也會(huì)是符合預(yù)期的。
利用operation state進(jìn)行生命周期管理,合理安排臨時(shí)對象,也是execution本身的一大特色,及區(qū)別于其他異步庫如asio的部分,這部分大家可以多與其他實(shí)現(xiàn)做橫向?qū)Ρ龋w會(huì)其中的優(yōu)缺點(diǎn),更容易把握到庫本身所偏向的表達(dá)方法。
四、context與物理線程關(guān)聯(lián)
前面我們也提到過,context&task的封裝本身是不包含線程的,所以業(yè)務(wù)層使用, 還需要wrapper一下, 比如single_thread_context的實(shí)現(xiàn):
class single_thread_context {manual_event_loop loop_;std::thread thread_;public:context() : loop_(), thread_([this] { loop_.run(); }) {}~context() {loop_.stop();thread_.join();}auto get_scheduler() noexcept {return loop_.get_scheduler();}std::thread::id get_thread_id() const noexcept {return thread_.get_id();}};
整體實(shí)現(xiàn)本身是對前面說的manual_event_loop的一個(gè)封裝, single_thread_context創(chuàng)建時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)線程對象并綁定manual_event_loop::run(),自動(dòng)開啟任務(wù)執(zhí)行循環(huán)。下面的例子子我們可以看到對single_thread_context的使用。
五、執(zhí)行過程簡述
我們結(jié)合一個(gè)簡單的示例代碼來看一下整體的執(zhí)行過程:
single_thread_context tcontext;int count = 0;| then([&] { ++count; }) | sync_wait();
假設(shè)我們在manual_event_loop.hpp中下面的函數(shù)中斷點(diǎn):
template <typename Receiver>operation<Receiver> connect(Receiver&& receiver) const {return operation<Receiver>{(Receiver &&) receiver, loop_};}
也就是構(gòu)造manual_event_loop::operation對象的地方(其實(shí)它就是一個(gè)context::task了),我們?nèi)菀追治龅剑琹oop就是我們傳入的context,后續(xù)start()的時(shí)候我們會(huì)把operation推送到它的task隊(duì)列中去等待執(zhí)行。此處模板嵌套的比較深,我們就不給出具體的調(diào)用棧了。我們可以來看下此處的receiver的類型:
execution::_then::_receiver<Reciever = execution::_sync_wait::_receiver<execution::_unit::unit>::type,Func =`Execution_TestAllocatePipe_Test::TestBody'::`2'::<lambda_1>>::type &&
結(jié)合上面的示例,我們比較容易分析出整體的類型大致形成的過程,這里不詳細(xì)贅述了。有了具體的operation,通過前面的分析我們知道,start()的時(shí)候,相應(yīng)的task會(huì)被加入到context的任務(wù)隊(duì)列中等待執(zhí)行,最后就成功的調(diào)用到了then()中包含的lambda,驅(qū)動(dòng)count計(jì)數(shù)+1,因?yàn)閟ync_wait()本身是等待執(zhí)行的,所以棧上聲明的tcontext,count都能夠正確的起作用。需要注意的是,如果不是同步等待的情況,這里的用法肯定是不適用的。
六、context的其他使用
除了標(biāo)準(zhǔn)的task執(zhí)行支持,以及與execution的結(jié)合,libunifex中還有一種context的特殊使用,以一個(gè)獨(dú)立的context,用作sync_wait()實(shí)現(xiàn)中的異步等待,這個(gè)作用類似我們經(jīng)常在其他異步庫看到的fence,libunifex這個(gè)地方偷了個(gè)懶,直接復(fù)用了context來做相關(guān)的實(shí)現(xiàn)。為了方便大家理解,這里我們沿用前面的示例,直接給出VS的Parallel Stacks執(zhí)行情況,來方便大家直觀感受相關(guān)實(shí)現(xiàn)的具體執(zhí)行情況:

如上所示,這種sync_wait的方式,導(dǎo)致主線程在異步任務(wù)執(zhí)行完成前,會(huì)無休眠的利用另外一個(gè)context::run空跑一個(gè)while(true)循環(huán),直到對應(yīng)task執(zhí)行完成最終調(diào)用set_value(),才通過signal_complete()調(diào)用了主線程上這個(gè)context的stop()函數(shù),退出這個(gè)死循環(huán)。這個(gè)實(shí)現(xiàn)還是有點(diǎn)簡單直白了,實(shí)際業(yè)務(wù)使用,這種實(shí)現(xiàn)容易帶來問題。當(dāng)然,相關(guān)的改良方法我們后面會(huì)提到,這里不再進(jìn)行展開了。
七、transfer-如何在執(zhí)行中切換scheduler
除了前面介紹的整個(gè)pipeline在單個(gè)scheduler執(zhí)行的情況,我們肯定容易想到,業(yè)務(wù)場景中比較容易出現(xiàn),pipeline的不同部分需要工作在不同的scheduler上,這點(diǎn)是如何做到的呢?這部分我們先擱置一下,在后續(xù)的文章中將具體介紹相關(guān)的實(shí)現(xiàn),libunifex本身不包含相關(guān)的實(shí)現(xiàn),感覺可能原因是這部分與scheduler本身的實(shí)現(xiàn)比較強(qiáng)相關(guān),而我們知道這塊libunifex并沒有做得特別好。
八、其他的scheduler與context實(shí)現(xiàn)
除了上面介紹的manual_event_loop,以及它的帶線程的single_thread_context封裝,libunifex還包含其他調(diào)度器的實(shí)現(xiàn),相關(guān)的用途和實(shí)現(xiàn)這里簡單列出,感興趣的讀者可以自行翻閱代碼理解,相關(guān)的實(shí)現(xiàn)除了專有功能的實(shí)現(xiàn),主體的封裝方式和技巧都比較類同,理解了manual_event_loop的實(shí)現(xiàn),再來看其他的實(shí)現(xiàn),不會(huì)有太多的障礙。感覺大部分的實(shí)現(xiàn)跟實(shí)際業(yè)務(wù)的預(yù)期都會(huì)存在一定的距離,這里不詳細(xì)分析了。不同的scheduler實(shí)現(xiàn)如下:
single_thread_context
前文已經(jīng)介紹過了,創(chuàng)建一個(gè)后臺(tái)線程對發(fā)起到它之上的任務(wù)進(jìn)行執(zhí)行的調(diào)度器。
inline_scheduler
其實(shí)就是不調(diào)度,相關(guān)的operation_state::start()時(shí)直接執(zhí)行set_value,不存在task加入任務(wù)隊(duì)列等待執(zhí)行的過程。
trampoline_scheduler
限制單次最大執(zhí)行數(shù)的調(diào)度器,感覺這應(yīng)該作為其他調(diào)度器的一種可選功能,而不是作為一個(gè)單獨(dú)的調(diào)度器來實(shí)現(xiàn)。
new_thread_context
遇到新的調(diào)度就嘗試創(chuàng)建一條新線程去執(zhí)行相關(guān)任務(wù)的調(diào)度器。
static_thread_pool
可以簡單看成single_thread_context的線程池版本,其實(shí)可以直接考慮復(fù)用manual_event_loop的實(shí)現(xiàn),不過這個(gè)地方是直接獨(dú)立代碼實(shí)現(xiàn)的,代碼大量類同。
thread_unsafe_event_loop
區(qū)別于manual_event_loop,非線程安全的一版實(shí)現(xiàn)。
timed_single_thread_context
支持schedule_at()和schedule_after()這兩個(gè)時(shí)間調(diào)度功能的調(diào)度器實(shí)現(xiàn)。
linux::io_uring_context
Linux io_uring的專有調(diào)度器實(shí)現(xiàn),io_uring是linux下比較完整的操作系統(tǒng)級Async IO實(shí)現(xiàn)(對標(biāo)Windows的完成端口)。
win::windows_thread_pool
利用Windows系統(tǒng)支持的線程池和Timer實(shí)現(xiàn)的Windows專用調(diào)度器。
win::low_latency_iocp_context
與linux::io_uring一樣,利用iocp實(shí)現(xiàn)的調(diào)度器。
九、總結(jié)
從scheduler這部分來說,libunifex本身的實(shí)現(xiàn)也不是盡善盡美,貼合實(shí)際業(yè)務(wù)需求的,這可能本身與大部分異步庫的抽象理念也有關(guān),一般會(huì)剝離掉線程調(diào)度相關(guān)的那部分代碼,各類業(yè)務(wù)差異巨大,都會(huì)有自己習(xí)慣的任務(wù)調(diào)度方式,這部分更多的意義感覺是讓我們理解應(yīng)該如何橋接一個(gè)自有的調(diào)度器與execution框架,而特定的調(diào)度器,更應(yīng)該從業(yè)務(wù)側(cè)出發(fā),根據(jù)業(yè)務(wù)需求合理規(guī)劃設(shè)計(jì)實(shí)現(xiàn)了。
參考資料:
1.libunifex源碼庫
作者簡介
沈芳
騰訊后臺(tái)開發(fā)工程師
IEG研發(fā)效能部開發(fā)人員,畢業(yè)于華中科技大學(xué)。目前負(fù)責(zé)CrossEngine Server的開發(fā)工作,對GamePlay技術(shù)比較感興趣。
推薦閱讀
手把手實(shí)踐一個(gè)DAPP,通往Web3.0之路!
C++異步變化:libunifex實(shí)現(xiàn)!

溫馨提示:因公眾號平臺(tái)更改了推送規(guī)則,公眾號推送的文章文末需要點(diǎn)一下“贊”和“在看”,新的文章才會(huì)第一時(shí)間出現(xiàn)在你的訂閱列表里噢~
