<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          C++異步:libunifex的scheduler實(shí)現(xiàn)!

          共 9869字,需瀏覽 20分鐘

           ·

          2022-07-16 23:00


          導(dǎo)語 | 本篇我們將集中介紹在cpu thread類型的execution context,不涉及異構(gòu)的execution context實(shí)現(xiàn)和調(diào)度。


          前篇《C++異步:libunifex中的concepts詳解!中我們相對深入的介紹了libunifex中的concepts的方方面面,對execution的整體設(shè)計(jì)框架有了一個(gè)基礎(chǔ)的認(rèn)知,本篇我們將繼續(xù)介紹作為execution執(zhí)行基石的scheduler的實(shí)現(xiàn)細(xì)節(jié)。本篇的介紹集中在cpu thread類型的execution context上,不涉及異構(gòu)的execution context實(shí)現(xiàn)和調(diào)度。


          一、scheduler的實(shí)現(xiàn)概述


          我們先來回顧一下scheduler在execution整體設(shè)計(jì)中的位置和作用:



          libunifex中的Scheduler其實(shí)就是一個(gè)輕量的Wrapper,真正負(fù)責(zé)異步任務(wù)執(zhí)行的是底層的Execution Context實(shí)現(xiàn)。對于非異構(gòu)的實(shí)現(xiàn), 這里的Execution Context一般代表一個(gè)Cpu線程或者一組Cpu線程(線程池),最簡單的情況是相關(guān)的任務(wù)被投遞到一個(gè)線程上來執(zhí)行。我們會(huì)通過Scheduler對相關(guān)的Execution Context再包裝一次,pipeline組織的過程中將只涉及Scheduler,但Scheduler內(nèi)部一般都會(huì)包含相關(guān)Exectuion Context的包裝與實(shí)現(xiàn)。


          未做任何加工的情況,我們能夠想象,所有事情都將一口氣在Assembly Thread上發(fā)生完畢,那如果我們要實(shí)現(xiàn)將異步操作調(diào)度到工作線程上執(zhí)行,應(yīng)該如何實(shí)現(xiàn)呢?這里我們直接以libunifex中比較常用的manual_event_loop實(shí)現(xiàn)來說明整個(gè)實(shí)現(xiàn)邏輯:



          直接看圖整個(gè)實(shí)現(xiàn)還是比較復(fù)雜的,但其實(shí)最重要的地方只有幾處


          • context實(shí)現(xiàn)-Work Thread本身的工作機(jī)制,它應(yīng)該是能夠主動(dòng)執(zhí)行自身包含的任務(wù)隊(duì)列的。


          • 與execution橋接-Work Thread提供機(jī)制,允許其它線程向自己插入待執(zhí)行的任務(wù),并且我們需要將相關(guān)的任務(wù)包裝為符合exection設(shè)計(jì)的形態(tài)。


          • context與物理線程關(guān)聯(lián)-我們最后肯定需要將context的運(yùn)行與一個(gè)具體的物理線程關(guān)聯(lián)起來,這樣context才能不斷的執(zhí)行投遞到其中的task。做到這幾點(diǎn),整個(gè)異步操作的執(zhí)行就自然的轉(zhuǎn)移到Work Thread了。


          下文我們將結(jié)合具體的代碼實(shí)現(xiàn)來分析這兩點(diǎn)是怎么達(dá)成的。



          二、context實(shí)現(xiàn)分析

          manual_event_loop版


          manual_event_loop版的context實(shí)現(xiàn)比較簡潔。它的默認(rèn)調(diào)度器實(shí)現(xiàn)的核心實(shí)現(xiàn)位于manual_event_loop.h&與大部分調(diào)度器實(shí)現(xiàn)類似,它采用context與task的相關(guān)抽象與線程是剝離的方式,主要完成兩部分的功能:


          • 實(shí)現(xiàn)一個(gè)標(biāo)準(zhǔn)的任務(wù)管理器,有標(biāo)準(zhǔn)的任務(wù)加入和執(zhí)行接口。


          • 完成與execution系統(tǒng)的橋接,這部分主要是由scheduler實(shí)現(xiàn)來完成的。??


          下面我們分開來看一下這兩部分的實(shí)現(xiàn):


          (一)context與task_base的實(shí)現(xiàn)


          一個(gè)標(biāo)準(zhǔn)的任務(wù)管理器在很多地方我們可能都會(huì)用到,比如libunifex中的異步任務(wù)調(diào)度,比如一些定時(shí)器的實(shí)現(xiàn)。相關(guān)的代碼實(shí)現(xiàn)一般都比較簡單,libunifex的實(shí)現(xiàn)也同樣如此,相關(guān)的概覽圖如下:



          核心代碼如下:task_base:


          struct manual_event_loop::task_base {  using execute_fn = void(task_base*) noexcept;
          explicit task_base(execute_fn* execute) noexcept : execute_(execute){}
          void execute() noexcept { this->execute_(this); }
          task_base* next_ = nullptr; execute_fn* execute_;};


          context:


          class manual_event_loop::context {  void run() {    std::unique_lock lock{mutex_};    while (true) {      while (head_ == nullptr) {        if (stop_) return;        cv_.wait(lock);      }      auto* task = head_;      head_ = task->next_;      if (head_ == nullptr) {        tail_ = nullptr;      }      lock.unlock();      task->execute();      lock.lock();    }  }  void stop() {    std::unique_lock lock{mutex_};  stop_ = true;    cv_.notify_all();  } private:  void enqueue(task_base* task) {    std::unique_lock lock{mutex_};    if (head_ == nullptr) {      head_ = task;    } else {      tail_->next_ = task;    }    tail_ = task;    task->next_ = nullptr;    cv_.notify_one();  }
          std::mutex mutex_; std::condition_variable cv_; task_base* head_ = nullptr; task_base* tail_ = nullptr; bool stop_ = false;};


          上述利用鏈表實(shí)現(xiàn)了一個(gè)FIFO的task隊(duì)列,然后提供了一個(gè)enque()接口用于推送新任務(wù)到context,一個(gè)run()接口用于執(zhí)行整個(gè)任務(wù)隊(duì)列。核心代碼使用了一個(gè)std::mutex和std::condition_variable,模擬了一個(gè)類似semaphore的作用,這樣我們跨線程的執(zhí)行run(),enque(),核心代碼本身就是適配多線程的。代碼細(xì)節(jié)我們不一一展開了,相關(guān)的實(shí)現(xiàn)直接看代碼理解更方便。同時(shí)有心的讀者可能會(huì)注意到,libunifex的任務(wù)管理器實(shí)現(xiàn)用的是raw pointer的task_base,這樣會(huì)不會(huì)出現(xiàn)memory leak相關(guān)的問題呢?這部分的答案我們在下一章節(jié)解釋。


          從上面的代碼中可以看到,libunifex的實(shí)現(xiàn)比較多的依賴臨時(shí)命名空間(以'_'打頭的命名空間),然后再通過using給出外部的使用名稱。所以在代碼里我們可能會(huì)看到不同臨時(shí)命名空間下的很多context實(shí)現(xiàn),注意區(qū)分好相關(guān)的命名空間,同名的context一般都是類同的實(shí)現(xiàn),習(xí)慣了理解相關(guān)代碼也是比較方便的。



          三、與execution的橋接


          區(qū)別于其他的task處理框架,如asio的lambda post模式,execution框架是通過泛型的connect()和start()來完成對sender和receiver的連接和使用的,libunifex關(guān)聯(lián)context&task_base與execution框架其他部分的方式也沿續(xù)了這種思路。下面我們通過具體的代碼來看libunifex是如何通過對task_base泛型的支持,以及特定的connect() + start()的實(shí)現(xiàn),來完成相關(guān)的橋接的。tast_base的泛型實(shí)現(xiàn):我們先來看一下task_base的泛型實(shí)現(xiàn),其實(shí)也就是scheduler產(chǎn)生的sender和任意receiver執(zhí)行connect()操作后產(chǎn)生的OperationState:


          template <typename Receiver>class operation final : task_base { public:  template <typename Receiver2>  explicit type(Receiver2&& receiver, context* loop)    : task_base(&type::execute_impl)    , receiver_((Receiver2 &&) receiver)    , loop_(loop) {}
          void start() noexcept{ loop_->enqueue(this); }
          private: static void execute_impl(task_base* t) noexcept { auto& self = *static_cast<type*>(t); execution::set_value(std::move(self.receiver_)); // ... some code ignore here }
          Receiver receiver_; context* const loop_;};


          可以看到,libunifex很巧妙的實(shí)現(xiàn)了一個(gè)繼承自task_base的operation類,比較重要的是兩點(diǎn):


          • 在該類構(gòu)造的時(shí)候,會(huì)將最終處理receiver通知的execute_impl關(guān)聯(lián)為task的execute()的目標(biāo)函數(shù)。


          • 該op類的start()方法,會(huì)將自己通過context::enqueue()加入到context的任務(wù)隊(duì)列中等待執(zhí)行通過這兩步,我們完成了execution與一個(gè)任務(wù)調(diào)度器結(jié)合的絕大部分工作,當(dāng)然,還差了connect()相關(guān)的處理代碼,這部分是由剩下的scheduler_task與scheduler來共同完成的。


          此處的context由于最后的using,外部的使用名稱是manual_event_loop,這個(gè)我們需要注意一下,下文中會(huì)直接使用到這個(gè)名稱。


          注意此處內(nèi)嵌類_op::type的定義方式,先是前置聲明了type,后續(xù)直接通過class _op::type來定義相關(guān)的類,而不是直接在_op類中直接使用內(nèi)嵌的方式來調(diào)用,這對于代碼閱讀是有利的,避免內(nèi)嵌類層級化后導(dǎo)致的代碼理解成本增加。


          scheduler_task與scheduler:?在前文《C++異步:structured concurrency實(shí)現(xiàn)解析!》中我們介紹了Sender Factory,schedule()其實(shí)也是一個(gè)Sender Factory,而它產(chǎn)生的scheduler_task其實(shí)就是一個(gè)sender,所以它包含兩部分sender的實(shí)現(xiàn):


          • sender traits需要用到的類型定義,如value_types和error_types,這個(gè)地方都是void,因?yàn)樗兇饩褪且粋€(gè)時(shí)機(jī)控制節(jié)點(diǎn),不會(huì)向后續(xù)節(jié)點(diǎn)傳遞任何值。


          • connect()成員函數(shù),可以很明顯的看到此處只是簡單的返回前面我們介紹的繼承自task_base的operation<Receiver>。


          scheduler_task是scheduler的內(nèi)嵌類,此處為了方便閱讀我們將scheduler_task外置:


          class schedule_task { public:  template </*...*/>  using value_types = /*unspecified*/;
          template </*...*/> using error_types = Variant<>;
          static constexpr bool sends_done = true;
          template <typename Receiver> operation<Receiver> connect(Receiver&& receiver) const { return operation<Receiver>{(Receiver &&) receiver, loop_}; }
          private: friend scheduler;
          explicit schedule_task(context* loop) noexcept : loop_(loop) {}
          context* const loop_; };


          class scheduler { explicit scheduler(context* loop) noexcept : loop_(loop) {}
          public: schedule_task schedule() const noexcept { return schedule_task{loop_}; }
          friend bool operator==(scheduler a, scheduler b) noexcept { return a.loop_ == b.loop_; } friend bool operator!=(scheduler a, scheduler b) noexcept { return a.loop_ != b.loop_; }
          private: context* loop_; };


          另外回到上一節(jié)中我們提到的問題,context的整個(gè)實(shí)現(xiàn)是使用的raw pointer的task,并沒有對task的生命周期做管理,而此處構(gòu)造的task對象,也是棧上的值類型,這是因?yàn)檎麄€(gè)execution機(jī)制,都是依賴于前面提到過的operation state本身來保證生命周期的,對于此處來說,connect()的時(shí)候產(chǎn)生的棧對象operation<Receiver>,結(jié)束信號set_value調(diào)用完成前,也就是我們真正執(zhí)行到operation<Receiver>::execute()的時(shí)候,它都是有效的。


          這也就巧妙的保證了,雖然我們context的實(shí)現(xiàn)使用的是raw pointer,并沒有對task的生命周期進(jìn)行管理,但它的生命周期也會(huì)是符合預(yù)期的。


          利用operation state進(jìn)行生命周期管理,合理安排臨時(shí)對象,也是execution本身的一大特色,及區(qū)別于其他異步庫如asio的部分,這部分大家可以多與其他實(shí)現(xiàn)做橫向?qū)Ρ龋w會(huì)其中的優(yōu)缺點(diǎn),更容易把握到庫本身所偏向的表達(dá)方法。



          四、context與物理線程關(guān)聯(lián)


          前面我們也提到過,context&task的封裝本身是不包含線程的,所以業(yè)務(wù)層使用, 還需要wrapper一下, 比如single_thread_context的實(shí)現(xiàn):


          class single_thread_context {  manual_event_loop loop_;  std::thread thread_;public:  context() : loop_(), thread_([this] { loop_.run(); }) {}
          ~context() { loop_.stop(); thread_.join(); }
          auto get_scheduler() noexcept { return loop_.get_scheduler(); }
          std::thread::id get_thread_id() const noexcept { return thread_.get_id(); }};


          整體實(shí)現(xiàn)本身是對前面說的manual_event_loop的一個(gè)封裝, single_thread_context創(chuàng)建時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)線程對象并綁定manual_event_loop::run(),自動(dòng)開啟任務(wù)執(zhí)行循環(huán)。下面的例子子我們可以看到對single_thread_context的使用。



          五、執(zhí)行過程簡述


          我們結(jié)合一個(gè)簡單的示例代碼來看一下整體的執(zhí)行過程:


            single_thread_context tcontext;  int count = 0;  schedule(tcontext.get_scheduler()) | then([&] { ++count; }) | sync_wait();


          假設(shè)我們在manual_event_loop.hpp中下面的函數(shù)中斷點(diǎn):


          template <typename Receiver> operation<Receiver> connect(Receiver&& receiver) const { return operation<Receiver>{(Receiver &&) receiver, loop_}; }


          也就是構(gòu)造manual_event_loop::operation對象的地方(其實(shí)它就是一個(gè)context::task了),我們?nèi)菀追治龅剑琹oop就是我們傳入的context,后續(xù)start()的時(shí)候我們會(huì)把operation推送到它的task隊(duì)列中去等待執(zhí)行。此處模板嵌套的比較深,我們就不給出具體的調(diào)用棧了。我們可以來看下此處的receiver的類型:


          execution::_then::_receiver<  Reciever = execution::_sync_wait::_receiver<execution::_unit::unit>::type,  Func =`Execution_TestAllocatePipe_Test::TestBody'::`2'::<lambda_1>>::type &&


          結(jié)合上面的示例,我們比較容易分析出整體的類型大致形成的過程,這里不詳細(xì)贅述了。有了具體的operation,通過前面的分析我們知道,start()的時(shí)候,相應(yīng)的task會(huì)被加入到context的任務(wù)隊(duì)列中等待執(zhí)行,最后就成功的調(diào)用到了then()中包含的lambda,驅(qū)動(dòng)count計(jì)數(shù)+1,因?yàn)閟ync_wait()本身是等待執(zhí)行的,所以棧上聲明的tcontext,count都能夠正確的起作用。需要注意的是,如果不是同步等待的情況,這里的用法肯定是不適用的。



          六、context的其他使用


          除了標(biāo)準(zhǔn)的task執(zhí)行支持,以及與execution的結(jié)合,libunifex中還有一種context的特殊使用,以一個(gè)獨(dú)立的context,用作sync_wait()實(shí)現(xiàn)中的異步等待,這個(gè)作用類似我們經(jīng)常在其他異步庫看到的fence,libunifex這個(gè)地方偷了個(gè)懶,直接復(fù)用了context來做相關(guān)的實(shí)現(xiàn)。為了方便大家理解,這里我們沿用前面的示例,直接給出VS的Parallel Stacks執(zhí)行情況,來方便大家直觀感受相關(guān)實(shí)現(xiàn)的具體執(zhí)行情況:



          如上所示,這種sync_wait的方式,導(dǎo)致主線程在異步任務(wù)執(zhí)行完成前,會(huì)無休眠的利用另外一個(gè)context::run空跑一個(gè)while(true)循環(huán),直到對應(yīng)task執(zhí)行完成最終調(diào)用set_value(),才通過signal_complete()調(diào)用了主線程上這個(gè)context的stop()函數(shù),退出這個(gè)死循環(huán)。這個(gè)實(shí)現(xiàn)還是有點(diǎn)簡單直白了,實(shí)際業(yè)務(wù)使用,這種實(shí)現(xiàn)容易帶來問題。當(dāng)然,相關(guān)的改良方法我們后面會(huì)提到,這里不再進(jìn)行展開了。



          七、transfer-如何在執(zhí)行中切換scheduler


          除了前面介紹的整個(gè)pipeline在單個(gè)scheduler執(zhí)行的情況,我們肯定容易想到,業(yè)務(wù)場景中比較容易出現(xiàn),pipeline的不同部分需要工作在不同的scheduler上,這點(diǎn)是如何做到的呢?這部分我們先擱置一下,在后續(xù)的文章中將具體介紹相關(guān)的實(shí)現(xiàn),libunifex本身不包含相關(guān)的實(shí)現(xiàn),感覺可能原因是這部分與scheduler本身的實(shí)現(xiàn)比較強(qiáng)相關(guān),而我們知道這塊libunifex并沒有做得特別好。


          八、其他的scheduler與context實(shí)現(xiàn)


          除了上面介紹的manual_event_loop,以及它的帶線程的single_thread_context封裝,libunifex還包含其他調(diào)度器的實(shí)現(xiàn),相關(guān)的用途和實(shí)現(xiàn)這里簡單列出,感興趣的讀者可以自行翻閱代碼理解,相關(guān)的實(shí)現(xiàn)除了專有功能的實(shí)現(xiàn),主體的封裝方式和技巧都比較類同,理解了manual_event_loop的實(shí)現(xiàn),再來看其他的實(shí)現(xiàn),不會(huì)有太多的障礙。感覺大部分的實(shí)現(xiàn)跟實(shí)際業(yè)務(wù)的預(yù)期都會(huì)存在一定的距離,這里不詳細(xì)分析了。不同的scheduler實(shí)現(xiàn)如下:


          • single_thread_context


          前文已經(jīng)介紹過了,創(chuàng)建一個(gè)后臺(tái)線程對發(fā)起到它之上的任務(wù)進(jìn)行執(zhí)行的調(diào)度器。


          • inline_scheduler


          其實(shí)就是不調(diào)度,相關(guān)的operation_state::start()時(shí)直接執(zhí)行set_value,不存在task加入任務(wù)隊(duì)列等待執(zhí)行的過程。


          • trampoline_scheduler


          限制單次最大執(zhí)行數(shù)的調(diào)度器,感覺這應(yīng)該作為其他調(diào)度器的一種可選功能,而不是作為一個(gè)單獨(dú)的調(diào)度器來實(shí)現(xiàn)。


          • new_thread_context


          遇到新的調(diào)度就嘗試創(chuàng)建一條新線程去執(zhí)行相關(guān)任務(wù)的調(diào)度器。


          • static_thread_pool


          可以簡單看成single_thread_context的線程池版本,其實(shí)可以直接考慮復(fù)用manual_event_loop的實(shí)現(xiàn),不過這個(gè)地方是直接獨(dú)立代碼實(shí)現(xiàn)的,代碼大量類同。


          • thread_unsafe_event_loop


          區(qū)別于manual_event_loop,非線程安全的一版實(shí)現(xiàn)。


          • timed_single_thread_context


          支持schedule_at()和schedule_after()這兩個(gè)時(shí)間調(diào)度功能的調(diào)度器實(shí)現(xiàn)。


          • linux::io_uring_context


          Linux io_uring的專有調(diào)度器實(shí)現(xiàn),io_uring是linux下比較完整的操作系統(tǒng)級Async IO實(shí)現(xiàn)(對標(biāo)Windows的完成端口)。


          • win::windows_thread_pool


          利用Windows系統(tǒng)支持的線程池和Timer實(shí)現(xiàn)的Windows專用調(diào)度器。


          • win::low_latency_iocp_context


          與linux::io_uring一樣,利用iocp實(shí)現(xiàn)的調(diào)度器。



          九、總結(jié)


          從scheduler這部分來說,libunifex本身的實(shí)現(xiàn)也不是盡善盡美,貼合實(shí)際業(yè)務(wù)需求的,這可能本身與大部分異步庫的抽象理念也有關(guān),一般會(huì)剝離掉線程調(diào)度相關(guān)的那部分代碼,各類業(yè)務(wù)差異巨大,都會(huì)有自己習(xí)慣的任務(wù)調(diào)度方式,這部分更多的意義感覺是讓我們理解應(yīng)該如何橋接一個(gè)自有的調(diào)度器與execution框架,而特定的調(diào)度器,更應(yīng)該從業(yè)務(wù)側(cè)出發(fā),根據(jù)業(yè)務(wù)需求合理規(guī)劃設(shè)計(jì)實(shí)現(xiàn)了。


          參考資料:

          1.libunifex源碼庫


           作者簡介


          沈芳

          騰訊后臺(tái)開發(fā)工程師

          IEG研發(fā)效能部開發(fā)人員,畢業(yè)于華中科技大學(xué)。目前負(fù)責(zé)CrossEngine Server的開發(fā)工作,對GamePlay技術(shù)比較感興趣。



           推薦閱讀


          5G正當(dāng)時(shí),無人駕駛未來將駛向何方?

          手把手實(shí)踐一個(gè)DAPP,通往Web3.0之路!

          C++異步:libunifex中的concepts詳解!

          C++異步變化:libunifex實(shí)現(xiàn)!



          溫馨提示:因公眾號平臺(tái)更改了推送規(guī)則,公眾號推送的文章文末需要點(diǎn)一下“贊”“在看”,新的文章才會(huì)第一時(shí)間出現(xiàn)在你的訂閱列表里噢~

          瀏覽 147
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天日天天草 | 8090操逼网 | 操逼网址视频 | 一区二区入口 | 日韩一区二区无码视频 |