<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          C++異步從理論到實踐總覽篇

          共 17936字,需瀏覽 36分鐘

           ·

          2022-06-28 19:13

          作者:fangshen,騰訊 IEG 客戶端開發(fā)工程師

          C++20帶來了coroutine特性, 同時新的execution也在提案過程中, 這兩者都給我們在C++中解決異步問題帶來了新的思路. 但對比其他語言的實現(xiàn), C++的協(xié)程和后續(xù)的execution都存在一定的理解和封裝成本, 本系列的分享我們將圍繞基本的原理, 相應的封裝, 以及剝析優(yōu)秀的第三方實現(xiàn), 最終結合筆者framework落地的情況來展開.

          1. 糾結的開篇

          之前設計我們游戲用的c++框架的時候, 剛好c++20的coroutine已經(jīng)發(fā)布, 又因為是專門 給game server用的c++ framework, 對多線程的訴求相對有限, 或者本著少并發(fā)少奇怪的錯誤的原則, 除網(wǎng)絡和IO和日志等少量模塊外, 大部分模塊主要還是工作在主線程上的, 所以當時設計的重點也就放在了c++20 coroutine的包裝和使用上, 更多的使用coroutine來完善異步的支持. 但如果考慮到framework作為前后端公用框架的話, 原來主要針對主線程使用的包裝的coroutine調(diào)度器就顯得有些不夠用, 以此作為基礎, 我們開始了嘗試結合比較新的c++異步思路, 來重新思考應該如何實現(xiàn)一個盡量利用c++新特性, 業(yè)務層簡單易用的異步框架了.

          本系列的主要內(nèi)容也是圍繞這條主線來鋪開, 過程中我們 主要以:

          1. 自有的framework異步實現(xiàn) - 主要落地嘗試利用c++20的coroutine實現(xiàn)一個業(yè)務級的調(diào)度器.
          2. asio - 這個應該不用多說了, 近年來一直高頻迭代, 業(yè)界廣泛使用的開源第三方庫, 中間的異步任務調(diào)度, 網(wǎng)絡部分的代碼實現(xiàn)都非常優(yōu)質(zhì).
          3. libunifex - 最接近當前sender/receiver版 execution提案的可實操版本, c++17/20兼容, 但不推薦使用c++17的版本進行任何嘗試, 原因后續(xù)文件會展開.
            這幾個庫作為基礎, 逐步展開我們對c++異步的探索, 然后再回到落地實踐這條主線上, 探討一個業(yè)務側(cè)使用簡單, 內(nèi)部高效的異步庫應該如何來實現(xiàn)并落地.  當然, 我們的側(cè)重點主要還是c++異步的調(diào)度和處理上, 網(wǎng)絡相關的有部分內(nèi)容可能會簡單提到, 但不會進行深入的展開. ??其實整個嘗試的過程只能說非常不順利了, 當然, 隨著對相關實現(xiàn)的深入理解和細節(jié)的深挖, 收益也是頗多的. 閑話不多說了, 我們直接切入主題, 以對異步的思考來展開這篇總覽的內(nèi)容.

          2. 前塵往事 - rstudio framework實現(xiàn)

          rstudio framework的異步框架由兩塊比較獨立的部分組成:

          1. 一部分是源自asio幾年前版本的post和strand部分實現(xiàn), 另外附加了一些業(yè)務側(cè)較常用的像Fence等對象;
          2. 另外一部分是主線程的協(xié)程調(diào)度器實現(xiàn), 這部分最早是基于c++17實現(xiàn)的一版stackless 協(xié)程; 另外一版則是gcc11.1正式發(fā)布后, 直接用c++20重構了整個實現(xiàn), 直接使用c++20的coroutine的一個版本.

          2.1 asio 部分

          ??這一部分的內(nèi)容因為后續(xù)有asio scheduler實現(xiàn)具體的分析篇章, 這個地方主要以業(yè)務側(cè)使用進行展開了.

          2.1.1 executor概述
          • 來源于1.6X boost同期的asio standalone版本
          • 去除了各平臺網(wǎng)絡處理相關的代碼
          • 僅保留了post和相關的功能(新版本有executor實現(xiàn))
          • 早期c++11兼容, 無coroutine支持
          • 除網(wǎng)絡庫外, asio非常有使用價值的一部分代碼
          2.1.2  一個簡單的使用示例
            GJobSystem->Post([]() {
                  //some calculate task here
                  //...
                  GJobSystem->Post(
                      []() {
                          //task notify code here
                          //...
                      },
                      rstudio::JobSystemType::kLogicJob);
                }, rstudio::JobSystemType::kWorkJob);

          相關的時序圖:

          2.1.3  當前框架使用的線程結構

          預定義的枚舉值:

          enum class JobSystemType : int {
            kLogicJob = 0,       // logic thread(main thread)
            kWorkJob,            // work thread
            kSlowJob,            // slow work thread(run io or other slow job)
            kNetworkJob,         // add a separate thread for network
            kNetworkConnectJob,  // extra connect thread for network
            kLogJob,             // log thread
            kNotifyExternalJob,  // use external process to report something, 1 thread only~~
            kTotalJobTypes,
          };

          不同Job說明:

          • kLogicJob

            • 主線程(邏輯線程)執(zhí)行任務
          • kWorkJob

            • Work Thread線程池執(zhí)行任務(多個), 一般是計算量可控的小任務
          • kSlowJob

            • IO專用線程池, IO相關的任務投遞到本線程池
          • kNetworkJob

            • 目前tbuspp專用的處理線程
          • kNetworkConnectJob

            • 專用的網(wǎng)絡連接線程, tbuspp模式下不需要
          • kLogJob

            • 日志專用線程, 目前日志模塊是自己起的線程, 可以歸并到此處管理
          • kNotifyExternalJob

            • 專用的通知線程, 如lua error的上報, 使用該類型
          2.1.4  Timer任務相關

          相關接口:

          //NoIgnore version
          uint64_t JobSystemModule::AddAlwaysRunJob(JobSystemType jobType,
                threads::ThreadJobFunction&& periodJob, 
                unsigned long periodTimeMs)
          ;

          uint64_t JobSystemModule::AddTimesRunJob(JobSystemType jobType, 
                threads::ThreadJobFunction&& periodJob, 
                unsigned long periodTimeMs, 
                unsigned int runCount)
          ;
                
          uint64_t JobSystemModule::AddDelayRunJob(JobSystemType jobType,  
                threads::ThreadJobFunction&& periodJob,
                unsigned long delayTimeMs)
          ;
                
          void JobSystemModule::KillTimerJob(uint64_t tid);

          本部分并未直接使用asio原始的basic_waitable_timer實現(xiàn), 而是自己實現(xiàn)的定時任務.

          2.1.5 在線程池上關聯(lián)執(zhí)行任務 - Strand
          • 特定的情況下, 被派發(fā)到Work線程池的任務存在依賴關系

          • 需要串聯(lián)執(zhí)行的時候, 這個時候我們需要額外的設施 JobStrand

          • 來保證任務是按先后依賴關系來串行執(zhí)行的

          • 如下圖中part1, part2, part3, part4串行執(zhí)行的情況所示

          示例代碼:

          auto strand = GJobSystem->RequestStrand(rstudio::JobSystemType::kWorkJob);
          starnd.Post([](){ 
              //part1~
              // ...
          });
          starnd.Post([](){ 
              //part2~
              // ...
          });
          starnd.Post([](){ 
              //part3~ 
              // ...
          });
          starnd.Post([](){ 
              //part4~ 
              // ...
          });
          starnd.Post([](){ 
              GJobSystem->Post([](){
                  //return code here
                  // ...
              }, rstudio::JobSystemType::kLogicJob); 
          });
          2.1.6 其他輔助設施
          JobFence
          jobs::JobFencePtr JobSystemModule::RequestFence();
          • 字面義, 柵欄, 起到攔截執(zhí)行的作用.
          • 一般多用于模塊的初始化和結束
          • 如tbuspp在kNetworkJob上的初始化和結束.

          示例代碼(TcpService的初始化):

          job_system_module_->Post(
              [this, workTicket]() {
                  if (!workTicket || workTicket->IsExpired()) return;

                  InitInNetworkThread();
              },
              JobSystemType::kNetworkJob);

          period_task_ptr = job_system_module_->AddAlwaysRunJob(
              JobSystemType::kNetworkJob,
              [this, workTicket]() {
                  if (!workTicket || workTicket->IsExpired()) return;

                  LoopInNetworkThread();
              },
              10);

          fence_->FenceTo((int)JobSystemType::kNetworkJob);
          fence_->Wait();
          JobNotify && JobWaiter
          jobs::JobWaiterPtr JobSystemModule::RequestWaiter();
          jobs::JobNotifyPtr JobSystemModule::RequestNotify();
          • 批量任務管理使用
          • 等待的方式的區(qū)別
            • JobNotify: 執(zhí)行完成調(diào)用額外指定的回調(diào).
            • JobWaiter: 以Wait的方式在特定線程等待所有Job執(zhí)行完成.
          JobTicket
          jobs::JobTicketPtr JobSystemModule::RequestTicket();
          • 令牌對象
          • 一般用來處理跨線程的生命周期控制
          • 回調(diào)之前先通過IsExpired()來判斷對應對象是否已經(jīng)釋放

          示例代碼:

          GJobSystem->Post(
            [this, workTicket]() {
           if (!workTicket || workTicket->IsExpired()) return;

           InitInNetworkThread();
            },
            JobSystemType::kNetworkJob);

          2.2 asio 與其他實現(xiàn)的對比

          ??正好今年的GDC上有一個<<One Frame In Halo Infinite>>的分享, 里面主要講述的是對Halo Infinite的引擎升級, 提供新的JobSystem和新的動態(tài)幀的機制來支撐項目的, 我們直接以它為例子來對比一下framework和Halo的實現(xiàn), 并且也借用Halo Infinite的例子, 來更好的了解這種lambda post模式的缺陷, 以及可以改進的點. ??Halo引入新的JobSystem主要是為了將老的Tetris結構的并發(fā)模式:

          向新的基于Dependency的圖狀結構遷移:

          他使用的JobSystem的業(yè)務Api其實很簡單, 我們直接來看一下相關的代碼:

          JobSystem& jobSsytem = JobSystem::Get();
          JobGraphHandle graphHandle = jobSystem.CreateJobGraph();

          JobHandle jobA = jobSystem.AddJob( 
           graphHandle, 
           "JobA",
           [](){...} );

          JobHandle jobB = jobSystem.AddJob(
           graphHandle,
           "JobB",
           [](){...} );

          jobSystem.AddJobToJobDependency(jobA, jobB);

          jobSystem.SubmitJobGraph(graphHandle);

          通過這樣的機制, 就很容易形成如:

          另外還有一個用于同步的SyncPoint:

          JobSystem& jobSystem = JobSystem::Get();
          JobGraphHandle graphHandle = jobSystem.CreateJobGraph();

          SyncPointHandle syncPointX = jobSystem.CreateSyncPoint(graphHandle, "SyncPointX");

          JobHandle jobA = jobSystem.AddJob(graphHandle, "JobA", [](){...});
          JobHandle jobB = jobSystem.AddJob(graphHandle, "JobB", [](){...});

          jobSystem.AddJobToSyncPointDependency(jobA, syncPointX);
          jobSystem.AddSyncPointToJobDependency(syncPointX, jobB);

          jobSystem.SubmitJobGraph(graphHandle);

          大致的作用如下:

          這樣在workload主動觸發(fā)SyncPoint后, 整體執(zhí)行才會繼續(xù)往下推進, 這樣就能方便的加入一些主動的同步點對整個Graph的執(zhí)行做相關的控制了。

          回到asio, 我們前面也介紹了, 使用strand和post(), 我們也能很方便的構造出Graph形的執(zhí)行情況 , 而SyncPoint其實類型framework中提供的Event, 表達上會略有差異, 但很容易看出兩套實現(xiàn)其實是相當類同的. 這樣的話, Halo 的JobSystem有的所有優(yōu)缺點, framework基本也同樣存在了, 這里簡單搬運一下:

          對于復雜并發(fā)業(yè)務的表達以lambda內(nèi)嵌為主, 雖然這種方式盡可能保證所有代碼上下文是比較集中的, 對比純粹使用callback的模式有所進步, 但這種自由度過高的方式本身也會存在一些問題, 純粹靠編碼者來維系并發(fā)上下文的正確性,  這種情況下狀態(tài)值在lambda之間的傳遞也需要特別的小心,  容易出錯, 并且難以調(diào)試。

          2.3 coroutine實現(xiàn)部分

          coroutine部分之前的帖子里已經(jīng)寫得比較詳細了, 這里僅給出鏈接以及簡單的代碼示例:

          1. 如何在C++17中實現(xiàn)stackless coroutine以及相關的任務調(diào)度器
          2. C++20 Coroutine實例教學
          3. 另外還有一個purecpp大會的演講視頻,  主要內(nèi)容與上述的兩篇文章相關度比較高, 這里也給出相關的鏈接, 感興趣的同學可以自行觀看:C++20 coroutine原理與應用

          代碼示例:

          //C++ 20 coroutine
          auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
          mScheduler.CreateTask20([clientProxy]() 
                                  -> rstudio::logic::CoResumingTaskCpp20 {
              auto* task = rco_self_task();

              printf("step1: task is %llu\n", task->GetId());
              co_await rstudio::logic::cotasks::NextFrame{};
              
              printf("step2 after yield!\n");
              int c = 0;
              while (c < 5) {
                  printf("in while loop c=%d\n", c);
                  co_await rstudio::logic::cotasks::Sleep(1000);
                  c++;
              }
              for (c = 0; c < 5; c++) {
                  printf("in for loop c=%d\n", c);
                  co_await rstudio::logic::cotasks::NextFrame{};
              }

              printf("step3 %d\n", c);
              auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false
                                              []()-> logic::CoResumingTaskCpp20 {
                  printf("from child coroutine!\n");
                  co_await rstudio::logic::cotasks::Sleep(2000);
                  printf("after child coroutine sleep\n");
              });
              printf("new task create in coroutine: %llu\n", newTaskId);
              printf("Begin wait for task!\n");
              co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };
              printf("After wait for task!\n");

              rstudio::logic::cotasks::RpcRequest 
                  rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};
              auto* rpcret = co_await rpcReq;
              if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {
                  assert(rpcret->totalRet == 1);
                  auto retval = rpcret->retValue.to<int>();
                  assert(retval == 4);
                  printf("rpc coroutine run suc, val = %d!\n", retval);
              }
              else {
                  printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);
              }
              co_await rstudio::logic::cotasks::Sleep(5000);
              printf("step4, after 5s sleep\n");
              co_return rstudio::logic::CoNil;
          } );

          執(zhí)行結果:

          step1: task is 1
          step2 after yield!
          in while loop c=0
          in while loop c=1
          in while loop c=2
          in while loop c=3
          in while loop c=4
          in for loop c=0
          in for loop c=1
          in for loop c=2
          in for loop c=3
          in for loop c=4
          step3 5
          new task create in coroutine: 2
          Begin wait for task!
          from child coroutine!
          after child coroutine sleep
          After wait for task!
          service yield call finish!
          rpc coroutine run suc, val = 4!
          step4, after 5s sleep

          整體來看, 協(xié)程的使用還是給異步編程帶來了很多便利, 但框架本身的實現(xiàn)其實還是有比較多迭代優(yōu)化的空間的:

          1. asio的調(diào)度部分與coroutine部分的實現(xiàn)是分離的
          2. coroutine暫時只支持主線程

          2.4 小結

          上面也結合halo的實例說到了一些限制, 那么這些問題有沒有好的解決辦法了, 答案是肯定的, 雖然execution并未完全通過提案, 但整體而言, execution新的sender/reciever模型, 對于解決上面提到的一些缺陷, 應該是提供了非常好的思路, 我們下一章節(jié)中繼續(xù)展開.


          3. so easy - execution就是解?

          最開始的想法其實比較簡單, 結合原來的framework, 適當引入提案中的execution一些比較可取的思路, 讓framework的異步編程能更多的吸取c++新特性和execution比較高級的框架抽象能力, 提升整個異步庫的實現(xiàn)質(zhì)量. 所以最開始定的主線思路其實是更多的向execution傾斜, 怎么了解掌握execution, 怎么與現(xiàn)在的framework結合成了主線思路.

          我們選擇的基礎參考庫是來自沖元宇宙這波改名的Meta公司的libunifex, 客觀來說, Meta公司的folly庫, 以及l(fā)ibunifex庫的實現(xiàn)質(zhì)量, 肯定都是業(yè)界前沿的, 對c++新特性的使用和探索, 也是相當給力的. 這些我們后續(xù)在分析libunifex具體實現(xiàn)的篇章中也能實際感受到.

          但深入了解libunifex后, 我們會發(fā)現(xiàn), 它的優(yōu)點有不少:

          1. 嘗試為c++提供表達異步的框架性結構.
          2. 泛型用得出神入化, ponder在它前面基本是小弟級別的, 一系列泛用性特別強的template 編程示例, 比如隱含在sender/receiver思路內(nèi)的lazy evaluate表達, 如何在大量使用泛型的情況下提供業(yè)務定制點等等.
          3. 結構化的表達并發(fā)和異步, 相關代碼的編寫從自由發(fā)揮自主把控走向框架化, 約束化, 能夠更有序更可靠的表達復雜異步邏輯
          4. 整個執(zhí)行pipeline的組織, 所有信息是compile time和runtime完備的, dependencies不會丟失.
          5. 節(jié)點之間的值類型是強制檢查的, 有問題的情況 , 大多時候compiler time就會報錯. 有不少優(yōu)點的同時, 也有很多缺點:
          6. 整個庫的實現(xiàn)嚴重依賴了c++20 ranges采用的一種定制手段 cpo, 并且也使用了類似ranges的pipe表達方法, 理解相關代碼存在一定的門坎.(后續(xù)會有具體的篇章展開相關的內(nèi)容)
          7. 庫同時向下兼容了c++17, 但由于c++17本身特性的限制, 引入了大量的宏, 以及X Macros展開的方式, 導致相關的代碼閱讀難度進一步提升. 但實際上c++17版本并不具備可維護的價值, 依賴SIFINAE的實現(xiàn), 如果中間任何一環(huán)報錯, 必然需要在N屏的報錯中尋找有效信息.
          8. libunifex對coroutine的支持存疑, 雖然讓coroutine可以作為一種reciever存在, 但本質(zhì)上來說, coroutine其實更適合拿來做流程控制的膠水, 而不是作為異步中的某個節(jié)點存在.
          9. 默認的scheduler實現(xiàn)質(zhì)量離工業(yè)級還存在一定的距離, 這一點后續(xù)的代碼分析中也會具體提到. 諸多問題的存在, 可能也是execution提案沒有短時間內(nèi)獲得通過的原因吧, 但整體來說, execution本身的理念還是很有參考價值的, 但以它的現(xiàn)狀來說, 離最終的解肯定還是有比較大的距離的.

          4. 嘗試重新思考 - 要什么, 用什么

          事情到這個點就有點尷尬了, 原有的asio, 架構層面來說, 跟新的execution是存在落差的. 而項目實踐上來說, asio相當穩(wěn)扎穩(wěn)打, 而以libunifex當前的狀態(tài)來說, 離工業(yè)化使用其實是有一定距離的. 但asio作者在21年時候的兩篇演講(更像coding show):

          1. Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming
          2. Talking Async Ep2: Cancellation in depth第一篇基本整個演示了asio從最開始的callback, 到融入c++20 coroutine后的優(yōu)雅異步表達, 我們可以通過下面的代碼片斷感受一下:

          asio相關示例代碼1

          awaitable<voidlisten(tcp::acceptor& acceptor, tcp::endpoint target)
          {
            for (;;)
            {
              auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
              if (e)
                break;

              auto ex = client.get_executor();
              co_spawn(ex, proxy(std::move(client), target), detached);
            }
          }

          asio相關示例代碼2

            auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
            if (!e)
            {
              co_await (
                  (
                    transfer(client, server, client_to_server_deadline) ||
                    watchdog(client_to_server_deadline)
                  )
                  &&
                  (
                    transfer(server, client, server_to_client_deadline) ||
                    watchdog(server_to_client_deadline)
                  )
                );
            }

          對比原來每個async_xxx()函數(shù)后接callback的模式, 整個實現(xiàn)可以說是相當?shù)膬?yōu)雅了, 代碼的可讀性也得到了極大的提高, 這兩段代碼都來自于上面的演講中, 想深入了解的可以直接打開相關的鏈接觀看視頻, 很推薦大家去看一下. ??能夠把復雜的事情用更簡潔易懂的方法表達, 這肯定是讓人振奮的, 當然, 深入了解相關實現(xiàn)后, 也會發(fā)現(xiàn)存在一些問題, 但我們的本意是參考學習, 得出最終想要的可以比較好的支撐并發(fā)和異步業(yè)務的基礎框架, 有這些, 其實已經(jīng)可以理出一條比較清晰的思路了:

          1. execution部分主要使用它的sender/receiver概念, 和它提供的一些通用的算法. 移除掉所有因為fallback c++17引入的大量代碼噪聲. 拋棄它并不完備的各種scheduler實現(xiàn)
          2. 協(xié)程借鑒部分asio的思路, 首先讓協(xié)程可以基于context上下文, 在跨線程的情況下使用, 另外更多還是使用原有框架有明確的scheduler的方式對所有協(xié)程進行管理和定制的模式.
          3. 使用asio的scheduler部分作為execution的底層scheduler實現(xiàn), 同時也使用asio的timer表達, 去除原始libunifex依賴不同scheduler提供schedule_at()方法來執(zhí)行定時器相關邏輯的實現(xiàn).
          4. 根據(jù)業(yè)務需要, 定制一些必要的sender adapter等簡化業(yè)務的使用.
          5. 嘗試用execution框架對接ISPC等特殊的并發(fā)庫, 能夠以一個清晰的方式來表達這種混合環(huán)境上執(zhí)行的邏輯.

          本系列涉及的基礎知識和相關內(nèi)容比較多, 先給出一個臨時的大綱, 后續(xù)可能會有調(diào)整. 目前的思路是先介紹大家相對熟悉度不那么高的execution基礎知識和libunifex, 后面再介紹asio相關的scheduler以及coroutine實現(xiàn), 最后再回歸筆者正在迭代的framework, 這樣一個順序來展開.

          參考

          1. One Frame in Halo Infinite
          2. asio官網(wǎng)
          3. libunifex源碼庫
          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  2024国产中文一区二区在线播放 | 色综合第一页 | 精精品人妻一区二区三区 | 手机在线操B视频 | 久久久久大香蕉 |