<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          TDSQL for PG 并行框架原理解析

          共 17954字,需瀏覽 36分鐘

           ·

          2024-07-17 21:36

          并行方式簡(jiǎn)介

          查詢并行

          查詢并行是指將一個(gè)查詢分解為多個(gè)子查詢,在多個(gè)處理器上同時(shí)執(zhí)行這些子查詢。查詢并行通常用于處理計(jì)算密集型和IO密集型的查詢,例如,涉及多個(gè)表連接、聚合、表掃描等操作的查詢。查詢并行可以有效地提高查詢性能,因?yàn)槊總€(gè)處理器只需要處理查詢的一部分。

          這種并行方式在傳統(tǒng)數(shù)據(jù)庫(kù)中使用比較多,比如Oracle、PostgreSQL,TDSQL for PG 也采用的是這種并行方式。這種方式的好處是能將查詢?nèi)蝿?wù)分解為多個(gè)任務(wù),分布在多個(gè)處理器(甚至跨服務(wù)器的處理器)上并行執(zhí)行,最終通過(guò) Gather 節(jié)點(diǎn)將結(jié)果匯總。

          相比其他的并行方式,查詢并行的調(diào)度更簡(jiǎn)單,正因?yàn)槿绱耍?strong>資源的使用效率不是最高的。另外,這種并行方式需要在處理器之間傳輸和同步數(shù)據(jù),系統(tǒng)開(kāi)銷(xiāo)較大。

          pipeline并行

          管道 (pipeline) 并行是指將一個(gè)操作的輸出作為另一個(gè)操作的輸入,這樣多個(gè)操作可以同時(shí)進(jìn)行。這種并行方式通常用于數(shù)據(jù)庫(kù)查詢處理中的多個(gè)階段,例如,從磁盤(pán)讀取數(shù)據(jù)、過(guò)濾數(shù)據(jù)、排序數(shù)據(jù)等。

          pipeline并行可以提高資源利用率,因?yàn)?pipeline 中的各個(gè)階段、pipeline 之間可以并行、異步執(zhí)行,而不是等待前一個(gè)階段完成。

          ClickHouse、Doris 等使用的就是這種并行方式。pipeline 并行的好處是能充分的利用資源,結(jié)合線程池技術(shù),可以非常精細(xì)的調(diào)度任務(wù),目的是提升數(shù)據(jù)處理的吞吐量。

          但是這種并行方式不夠靈活,因?yàn)槊總€(gè)處理階段的輸入輸出是固定的,限制了處理階段之間的交互和協(xié)作,同時(shí)還需要管理和協(xié)調(diào)好各個(gè)處理階段,提升了調(diào)度的復(fù)雜度。與之對(duì)應(yīng)的是 DAG(Directed Acyclic Graph) 方式,典型的產(chǎn)品就是 Spark。

          任務(wù)并行

          任務(wù)并行是指在多個(gè)處理器上同時(shí)執(zhí)行不同的任務(wù)。這種并行方式通常用于處理多個(gè)獨(dú)立的查詢或事務(wù)。任務(wù)并行可以提高系統(tǒng)的吞吐量,因?yàn)槎鄠€(gè)查詢或事務(wù)可以同時(shí)進(jìn)行。

          TDSQL for PG 的后臺(tái)任務(wù),比如 autovacuum、checkpointer 等就是這種并行方式,任務(wù)之間獨(dú)立執(zhí)行,互不干擾。

          數(shù)據(jù)并行

          數(shù)據(jù)并行是指在多個(gè)處理器上同時(shí)對(duì)數(shù)據(jù)集的不同部分執(zhí)行相同的操作。這通常是通過(guò)將數(shù)據(jù)劃分為多個(gè)分區(qū)來(lái)實(shí)現(xiàn)的,每個(gè)處理器負(fù)責(zé)處理一個(gè)分區(qū)。

          數(shù)據(jù)并行可以有效地提高查詢性能,因?yàn)槊總€(gè)處理器只需要處理數(shù)據(jù)的一部分。通常來(lái)說(shuō),上面的并行方式都會(huì)結(jié)合數(shù)據(jù)并行來(lái)執(zhí)行。

          指令并行

          本文指的指令并行是利用SIMD指令的并行,SIMD指令可以減少分支預(yù)測(cè)的開(kāi)銷(xiāo),提高內(nèi)存訪問(wèn)的局部性、cache的命中率。數(shù)據(jù)庫(kù)中的排序算法可以利用 SIMD 指令進(jìn)行并行比較和交換,join 也可以使用 SIMD 進(jìn)行并行的匹配,最常用的是壓縮和編碼用 SIMD 提升性能。

          TDSQL for PG 主要使用了查詢并行、數(shù)據(jù)并行、任務(wù)并行這幾種方式,本文重點(diǎn)要分析的是查詢并行的框架和原理。

          并行框架概述

          TDSQL for PG 并行框架總體流程

          在并行框架中有三種進(jìn)程角色,分部是 server 進(jìn)程,backend 進(jìn)程(也稱(chēng)作 leader 進(jìn)程)和 Background Worker 進(jìn)程。
          • server 進(jìn)程是資源調(diào)度進(jìn)程,負(fù)責(zé)進(jìn)程的分配
          • backend 是并行任務(wù)的發(fā)起進(jìn)程,負(fù)責(zé)并行執(zhí)行環(huán)境的初始化,也負(fù)責(zé)通過(guò) Gather 和 GatherMerge 節(jié)點(diǎn)匯總結(jié)果
          • Background Worker 進(jìn)程是任務(wù)的具體執(zhí)行者,并返回結(jié)果給backend 進(jìn)程。

          執(zhí)行的流程跟單進(jìn)程時(shí)一樣,都會(huì)依次調(diào)用 CreateQueryDesc(), ExecutorStart() , ExecutorRun(), ExecutorFinish(), ExecutorEnd() 函數(shù)。

          區(qū)別在于 Background Worker 需要先從動(dòng)態(tài)共享內(nèi)存中恢復(fù)執(zhí)行需要的環(huán)境,以及執(zhí)行結(jié)束后清理動(dòng)態(tài)內(nèi)存。

          TDSQL for PG 的并行框架主要流程如下圖所示:

          1.  Client 連接到 server 以后 server 進(jìn)程為其創(chuàng)建一個(gè) backend 進(jìn)程,banckend 進(jìn)程在生成執(zhí)行計(jì)劃的過(guò)程中識(shí)別出是否需要并行執(zhí)行,如果能并行執(zhí)行就會(huì)創(chuàng)建 Background Worker 進(jìn)程。

          2.  如果并行執(zhí)行,backend 進(jìn)程先調(diào)用ExecInitParallelPlan()函數(shù)初始化并行執(zhí)行需要的環(huán)境。
          包括執(zhí)行計(jì)劃的序列化(ExecSerializePlan()),動(dòng)態(tài)共享內(nèi)存初始化InitializeParallelDSM(), 動(dòng)態(tài)共享內(nèi)存初始化又包含動(dòng)態(tài)共享內(nèi)存段的創(chuàng)建,library、GUC、snapshot 等的序列化和拷貝。
          3.  接著后端進(jìn)程調(diào)用LaunchParallelWorkers()注冊(cè) Background Worker。
          注冊(cè)的方式是調(diào)用RegisterDynamicBackgroundWorker()查找可用的 Background Worker 槽位,如果找到就向 server 進(jìn)程發(fā)送PMSIGNAL_BACKGROUND_WORKER_CHANGE信號(hào)。
          4.  server 進(jìn)程處理信號(hào)(sigusr1_handler())
          調(diào)用BackgroundWorkerStateChange() 遍歷所有的 Background Worker 槽位,找到剛注冊(cè)的槽位,實(shí)例化一個(gè)RegisteredBgWorker并 push 到全局變量中。
          5.  接下來(lái) server 進(jìn)程調(diào)用maybe_start_bgworkers()遍歷BackgroundWorkerList
          為里面的每個(gè)RegisteredBgWorker fork進(jìn)程。fork 出來(lái)的進(jìn)程執(zhí)行ParallelWorkerMain(),ParallelWorkerMain()就是 background worker 的入口函數(shù)。
          并行框架的使用的大致流程如下:
          /* 進(jìn)入并行模式,阻止不安全的狀態(tài)修改 */
              
          EnterParallelMode();

          /* 創(chuàng)建并行執(zhí)行的上下文,并插入到全局鏈表 pcxt_list 中 */
          pcxt = CreateParallelContext();

          /* 估算變量占用的 DSM 的大小,包括變量本身的大小和 key 的大學(xué). */
          shm_toc_estimate_chunk(&pcxt->estimator, size);
          shm_toc_estimate_keys(&pcxt->estimator, keys);

          /* 創(chuàng)建 DSM 并拷貝數(shù)據(jù) */
          InitializeParallelDSM(pcxt);

          /* 在 DSM 中申請(qǐng)空間. */
          space = shm_toc_allocate(pcxt->toc, size);
          shm_toc_insert(pcxt->toc, key, space);

          /* 注冊(cè) background worker */
          LaunchParallelWorkers(pcxt);

          /* 執(zhí)行并行任務(wù)(計(jì)劃) */

          /* 等待并行 worker 執(zhí)行結(jié)束 */

          /* 讀取共享內(nèi)存中的結(jié)果 */

          DestroyParallelContext(pcxt);

          /* 退出并行執(zhí)行模式 */
          ExitParallelMode();

          通信機(jī)制

          并行執(zhí)行避免不了進(jìn)程或線程之間的通信,TDSQL for PG 的并行框架采用的是進(jìn)程模型,主要用到了兩種通信機(jī)制,一個(gè)是信號(hào),一個(gè)是共享內(nèi)存。

          1.  信號(hào)

          信號(hào)主要是控制流,在并行框架中,后端進(jìn)程注冊(cè) background worker 時(shí)向 server 進(jìn)程發(fā)送信號(hào)SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)),server 進(jìn)程調(diào)用 sigusr1_handler()處理信號(hào),并創(chuàng)建 background worker 進(jìn)程。

          當(dāng) background worker 執(zhí)行結(jié)束,會(huì)通過(guò)信號(hào)通知 backend 進(jìn)程。

          2.  動(dòng)態(tài)共享內(nèi)存

          在 TDSQL for PG 的并行框架中,動(dòng)態(tài)共享內(nèi)存主要用來(lái)傳遞狀態(tài)和數(shù)據(jù)。
          從 backend 進(jìn)程傳遞給 background worker 的狀態(tài)主要有執(zhí)行計(jì)劃、GUC、事務(wù)信息、snapshot 信息等,這部分使用的動(dòng)態(tài)共享內(nèi)存在啟動(dòng)并行執(zhí)行的初始化階段調(diào)用InitializeParallelDSM()來(lái)完成。
          數(shù)據(jù)主要是從 background Worker 返回給 backend 的執(zhí)行結(jié)果和錯(cuò)誤信息,這些結(jié)果通過(guò)基于共享內(nèi)存的消息隊(duì)列shm_mq來(lái)傳遞,也就是 tuple queue, 每個(gè) background Worker 和 backend 之間都有一個(gè)消息隊(duì)列,是多對(duì)一的關(guān)系。
          ExecParallelCreateReaders()函數(shù)負(fù)責(zé)為每個(gè)background Worker 創(chuàng)建 tuple queue 。同樣的也會(huì)創(chuàng)建多對(duì)一的錯(cuò)誤消息隊(duì)列,用于 background Worker 傳遞具體的錯(cuò)誤信息給 backend。
          對(duì)于普通的 SELECT 語(yǔ)句,background Worker 寫(xiě)數(shù)據(jù)到 tuple queue,backend 進(jìn)程從 tuple queue 中讀取結(jié)果。
          TDSQL for PG 還實(shí)現(xiàn)了 INSERT 和 UPDATE 的并行執(zhí)行,此時(shí) background Worker 通過(guò)共享內(nèi)存中的變量把結(jié)果傳給 backend 進(jìn)程,而不需要通過(guò) tuple queue。

          關(guān)鍵數(shù)據(jù)結(jié)構(gòu)分析

          ParallelContext

          typedef struct ParallelContext
              
          {
          dlist_node node; /* 雙向鏈表的掛載點(diǎn) */
          SubTransactionId subid; /* 調(diào)用GetCurrentSubTransactionId獲取子事務(wù)ID */
          int nworkers; /* 計(jì)劃的Worker數(shù)量 */
          int nworkers_launched; /* 實(shí)際發(fā)起的Worker數(shù)量 */
          bool leader_participate; /* 主進(jìn)程是否參與執(zhí)行 */
          char *library_name; /* 庫(kù)的名稱(chēng),一般是postgres */
          char *function_name; /* background Worker的執(zhí)行函數(shù),用戶自定義,select對(duì)應(yīng)的是ParallelQueryMain */
          ErrorContextCallback *error_context_stack; /* 錯(cuò)誤上下文棧 */
          shm_toc_estimator estimator; /* 共享內(nèi)存大小估算 */
          dsm_segment *seg; /* 動(dòng)態(tài)共享內(nèi)存的狀態(tài)信息 */
          void *private_memory; /* 動(dòng)態(tài)共享內(nèi)存申請(qǐng)失敗后回退到非并行執(zhí)行是使用的內(nèi)存。*/
          shm_toc *toc; /* Shared Memory Table of Contents */
          ParallelWorkerInfo *worker; /* 是一個(gè)數(shù)組,每個(gè)Worker一個(gè),記錄Worker的信息 */
          int nknown_attached_workers; /* attach到error queue的Worker數(shù)量 */
          bool *known_attached_workers; /* 數(shù)組,標(biāo)記每個(gè)Worker attach的狀態(tài) */
          } ParallelContext;

          次創(chuàng)建ParallelContext后都會(huì)插入到雙向鏈表pcxt_list中,這個(gè)雙向鏈表用于記錄活躍的并行上下文。

          ParallelExecutorInfo

          typedef struct ParallelExecutorInfo
              
          {
          PlanState *planstate; /* plan subtree we're running in parallel */
          ParallelContext *pcxt; /* parallel context we're using */
          BufferUsage *buffer_usage; /* points to bufusage area in DSM */
          uint64 *processed_count; /* processed tuple count area in DSM */
          SharedExecutorInstrumentation *instrumentation; /* optional */
          struct SharedJitInstrumentation *jit_instrumentation; /* optional */
          dsa_area *area; /* points to DSA area in DSM */
          dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */
          bool finished; /* set true by ExecParallelFinish */
          /* These two arrays have pcxt->nworkers_launched entries: */
          shm_mq_handle **tqueue; /* tuple queues for worker output */
          struct TupleQueueReader **reader; /* tuple reader/writer support */
          } ParallelExecutorInfo;

          這個(gè)數(shù)據(jù)結(jié)構(gòu)記錄了并行執(zhí)行時(shí)的各種信息,由函數(shù)mq_bytes_readmq_bytes_written按 8 bytes 讀寫(xiě),必須用 memory barrier 同步。

          shm_mq

          struct shm_mq
          {
                  slock_t mq_mutex;
          PGPROC *mq_receiver;
          PGPROC *mq_sender;
          pg_atomic_uint64 mq_bytes_read;
          pg_atomic_uint64 mq_bytes_written;
          Size mq_ring_size;
          bool mq_detached;
          uint8 mq_ring_offset;
                  char mq_ring[FLEXIBLE_ARRAY_MEMBER];
          };

          共享內(nèi)存中的隊(duì)列。

           mq_receivermq_bytes_read只能被 receiver 改變。同理mq_sendermq_bytes_writte 只能被 sender 改變。

           mq_receivermq_sendermq_mutex保護(hù),一旦設(shè)置就不能改變,所以設(shè)置以后可以無(wú)鎖的讀。

           mq_bytes_readmq_bytes_written按 8 bytes 讀寫(xiě),必須用 memory barrier 同步。

          shm_mq_handle

          struct shm_mq_handle
              
          {
          shm_mq *mqh_queue;
          dsm_segment *mqh_segment;
          BackgroundWorkerHandle *mqh_handle;
          char *mqh_buffer;
          Size mqh_buflen;
          Size mqh_consume_pending;
          Size mqh_send_pending;
          Size mqh_partial_bytes;
          Size mqh_expected_bytes;
          bool mqh_length_word_complete;
          bool mqh_counterparty_attached;
          MemoryContext mqh_context;
          };
          用于管理共享隊(duì)列的數(shù)據(jù)結(jié)構(gòu)。

          ● mqh_queue

          指向關(guān)聯(lián)的消息隊(duì)列

          ● mqh_segment

          指向包含該消息隊(duì)列的動(dòng)態(tài)共享內(nèi)存

          ● mqh_handle

          與該消息隊(duì)列綁定的后臺(tái)工作進(jìn)程,由shm_mq_attach()綁定。

          ● mqh_buffer

          對(duì)于超過(guò)ring buffer大小的數(shù)據(jù),或者出現(xiàn)了回卷的數(shù)據(jù),就把隊(duì)列中的chunk拷貝到mqh_buffer。

          ● mqh_buflen

          mqh_buflen 是mqh_buffer的長(zhǎng)度。

          ● mqh_consume_pending

          當(dāng)mqh_consume_pending超過(guò)環(huán)形緩沖區(qū)1/4大小時(shí),說(shuō)明數(shù)據(jù)已經(jīng)消費(fèi)掉了,需要更新共享內(nèi)存中的數(shù)據(jù)。
          ● mqh_send_pending

          已經(jīng)寫(xiě)到queue中,但是還沒(méi)有更新到共享內(nèi)存的數(shù)據(jù)大小。只有當(dāng)數(shù)據(jù)大小超過(guò) ring buffer 的 1/4,或者tuple queue慢了的時(shí)候才更新共享內(nèi)存。

          ●mqh_partial_bytes、mqh_expected_bytes、and mqh_length_word_complete

          這三個(gè)變量用來(lái)跟蹤非阻塞操作的狀態(tài),記錄的是length word的發(fā)送情況。
          當(dāng)調(diào)用者嘗試非阻塞操作時(shí),但是返回了SHM_MQ_WOULD_BLOCK那么需要稍后用相同的參數(shù)重新調(diào)用這個(gè)參數(shù),所以需要記錄狀態(tài)還有多少數(shù)據(jù)沒(méi)有被發(fā)送。
          發(fā)送數(shù)據(jù)時(shí)shm_mq_sendv()),先發(fā)送要發(fā)送的字節(jié)數(shù)nbytes(類(lèi)型是Size),mqh_length_word_complete 就是記錄nbytes的幾個(gè)字節(jié)是否都發(fā)送完了。
          此時(shí) mqh_partial_bytes 表示已經(jīng)發(fā)生了幾個(gè)字節(jié),也可以用于記錄 payload 發(fā)送了多少字節(jié)。
          ● mqh_length_word_complete
          用于跟蹤是否完整的接收或者發(fā)送了所有的數(shù)據(jù)。
          mqh_partial_bytes 記錄了讀或者寫(xiě)了多少bytes數(shù)據(jù),而mqh_expected_bytes 只記錄期望讀的負(fù)載數(shù)據(jù)的總大小。

          ● mqh_counterparty_attached

          用于記錄對(duì)手(sender 或者 receiver)是否已經(jīng)掛載到queue。從而不必要的 mutex 申請(qǐng)。

          ● mqh_context

          shm_mq_handle所在的上下文,所有內(nèi)存的申請(qǐng)都要在這個(gè)上下文內(nèi)進(jìn)行。


          關(guān)鍵函數(shù)分析

          ExecInitParallelPlan()

          該函數(shù)主要初始化并行執(zhí)行需要的一些基礎(chǔ)信息,在并行的發(fā)起節(jié)點(diǎn)調(diào)用,比如Gather、GatherMerge、RemoteFragment(分布式場(chǎng)景下也支持節(jié)點(diǎn)內(nèi)并行)等。

          這個(gè)函數(shù)的核心流程如下:

               

          ParallelExecutorInfo *
              
          ExecInitParallelPlan(PlanState *planstate, EState *estate,
          Bitmapset *sendParams, int nworkers,
          int64 tuples_needed)
          {
          ...

          /* 序列化執(zhí)行計(jì)劃 */
          pstmt_data = ExecSerializePlan(planstate->plan, estate);

          /* 為返回值申請(qǐng)空間 */
          pei = palloc0(sizeof(ParallelExecutorInfo));

          /* 創(chuàng)建并行框架上下文 */
          pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
          pei->pcxt = pcxt;

          /* 動(dòng)態(tài)共享內(nèi)存大小估算,為每個(gè)需要傳遞給background worker的變量估算內(nèi)存大小,包括執(zhí)行計(jì)劃、BufferUsage、tuple queues等。*/
          shm_toc_estimate_chunk(&pcxt->estimator, ...);
          shm_toc_estimate_keys(&pcxt->estimator, 1);
          ...

          /* 為每個(gè)可并行的node估算其需要的共享內(nèi)存大小 */
          ExecParallelEstimate(planstate, &e);

          ...

          /* 為DSA估算空間。DSA的大小可以在執(zhí)行過(guò)程中改變,所以可能會(huì)更新的狀態(tài)放到這個(gè)區(qū)域。DSA會(huì)綁定多個(gè)DSM,當(dāng)DSA大小不夠時(shí),可以創(chuàng)建新的DSM。*/
          shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
          shm_toc_estimate_keys(&pcxt->estimator, 1);

          /* 為并行框架建立動(dòng)態(tài)共享內(nèi)存段,并將Worker需要的狀態(tài) copy 到共享內(nèi)存。*/
          InitializeParallelDSM(pcxt);

          /* 在DSM中為并行執(zhí)行需要的狀態(tài)信息申請(qǐng)共享內(nèi)存并插入到toc中。*/
          shm_toc_allocate(pcxt->toc, ...);
          shm_toc_insert(pcxt->toc, ..., ...);
          ...

          /* 為每個(gè)Worker創(chuàng)建一個(gè)tuple queue,用于leader和Worker之間傳遞執(zhí)行結(jié)果。*/
          pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);

          ...

          /* 遍歷planstate中所有的node,為其初始化共享內(nèi)存,把狀態(tài)信息拷貝的共享內(nèi)存。*/
          ExecParallelInitializeDSM(planstate, &d);

          ...

          return pei;
          }

          ExecSerializePlan

          用于執(zhí)行計(jì)劃的序列化,序列化以后放入共享內(nèi)存,傳遞給background worker,再經(jīng)過(guò)反序列化。
          static char *
              
          ExecSerializePlan(Plan *plan, EState *estate)
          {
          /* 實(shí)際調(diào)用copyObjectImpl()對(duì)執(zhí)行計(jì)劃中的算子、表達(dá)式進(jìn)行深度拷貝,會(huì)遞歸調(diào)用一些列以“_copy”開(kāi)頭的函數(shù)。*/
          plan = copyObject(plan);

          /* 復(fù)制PlannedStmt,復(fù)制background worker必要的信息,最后序列化返回*/
          pstmt = makeNode(PlannedStmt);
          ...
          return nodeToString(pstmt);
          }

          CreateParallelContext

          ParallelContext *
              
          CreateParallelContext(const char *library_name, const char *function_name, int nworkers);

          創(chuàng)建并行框架的上下文,library_name 是要加載的庫(kù)的名稱(chēng),通常為“postgres”, function_name 是并行執(zhí)行函數(shù)的名稱(chēng)。

          在background worker進(jìn)程的入口函數(shù)ParallelWorkerMain()中會(huì)通過(guò)這個(gè)函數(shù)名從library中加載函數(shù)并執(zhí)行,nworkers 是并行執(zhí)行的進(jìn)程數(shù)。

          ExecParallelSetupTupleQueues()

          Worker 進(jìn)程執(zhí)行完 plan segment 后,結(jié)果通過(guò)共享內(nèi)存消息隊(duì)列傳遞給 leader 進(jìn)程,這個(gè)函數(shù)就是為每個(gè) worker 創(chuàng)建一個(gè)共享隊(duì)列shm_mq_handle

               

          static shm_mq_handle **
              
          ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
          {
          /* 為每個(gè)worker的shm_mq_handle申請(qǐng)內(nèi)存 */
          responseq = (shm_mq_handle **)
          palloc(pcxt->nworkers * sizeof(shm_mq_handle *));

          /* 如果不需要重新初始化,那么就在DSM中為每一個(gè)worker的tuple queue申請(qǐng)空間;否則就直接查找。 */
          if (!reinitialize)
          tqueuespace =
          shm_toc_allocate(pcxt->toc, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
          else
          tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);

          /* 為每個(gè) worker 創(chuàng)建消息隊(duì)列,并將 leader 進(jìn)程設(shè)置為接收者,然后將 mq、dsm_segment 關(guān)聯(lián)起來(lái)。*/
          for (i = 0; i < pcxt->nworkers; ++i)
          {
          shm_mq *mq;

          mq = shm_mq_create(tqueuespace +
          ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
          (Size) PARALLEL_TUPLE_QUEUE_SIZE);

          shm_mq_set_receiver(mq, MyProc);
          responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
          }

          /* 插入到toc中, 方便在 background worker 中查表恢復(fù) */
          if (!reinitialize)
          shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);

          return responseq;
          }

          LaunchParallelWorkers()

          發(fā)起background worker的函數(shù),主要是調(diào)用以下代碼來(lái)完成:RegisterDynamicBackgroundWorker()

               

          void
              
          LaunchParallelWorkers(ParallelContext *pcxt)
          {
          BackgroundWorker worker;
          ...

          /* 誰(shuí)發(fā)起worker誰(shuí)就是leader */
          BecomeLockGroupLeader();

          ...

          // 注冊(cè)worker
          for (i = 0; i < pcxt->nworkers; ++i)
          {
          memcpy(worker.bgw_extra, &i, sizeof(int));
          if (!any_registrations_failed &&
          RegisterDynamicBackgroundWorker(&worker,
          &pcxt->worker[i].bgwhandle))
          {
                                  /* 注冊(cè)成功 */
          }
          else
          {
          /* 當(dāng)超過(guò)了max_worker_processes的限制,則注冊(cè)失敗。設(shè)置any_registrations_failed = true,防止繼續(xù)注冊(cè)。
          any_registrations_failed = true;
          ...
          }

          ...

          }

          RegisterDynamicBackgroundWorker()

          這個(gè)函數(shù)主要是從BackgroundWorkerData中獲取一個(gè)可用的BackgroundWorkerSlot將其設(shè)置為已經(jīng)占用。

          然后給 server 發(fā)送一個(gè)PMSIGNAL_BACKGROUND_WORKER_CHANGE信號(hào),通知server ,background worker 的狀態(tài)有變化。

          此時(shí) server 遍歷BackgroundWorkerSlot找到剛注冊(cè)的 background worker,為其創(chuàng)建進(jìn)程。

               

          bool
              
          RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
          BackgroundWorkerHandle **handle)
          {
          ...

          for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
          {

          if (!slot->in_use)
          {
          ...
          }
          }

          /* If we found a slot, tell the postmaster to notice the change. */
          if (success)
          SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);

          if (success && handle)
          {
          *handle = palloc(sizeof(BackgroundWorkerHandle));
          (*handle)->slot = slotno;
          (*handle)->generation = generation;
          }
          }

          ParallelWorkerMain

          background worker 進(jìn)程的入口函數(shù),屬于并行框架內(nèi)固定的函數(shù)。
          由這個(gè)函數(shù)調(diào)用實(shí)際的執(zhí)行函數(shù),對(duì)于select、update、Insert 語(yǔ)句,執(zhí)行函數(shù)就是ParallelQueryMain()對(duì)于并行創(chuàng)建索引,執(zhí)行函數(shù)就是_bt_parallel_build_main()
          調(diào)用CreateParallelContext()創(chuàng)建ParallelContext時(shí),執(zhí)行函數(shù)的名稱(chēng)作為參數(shù)傳遞,例如CreateParallelContext("postgres", "ParallelQueryMain", nworkers)
          這個(gè)函數(shù)的主要任務(wù)就是從共享內(nèi)存中反操作讀取信息,準(zhǔn)備 background worker 執(zhí)行需要的環(huán)境。

               

          void
              
          ParallelWorkerMain(Datum main_arg)
          {
          ...

          /* attach 共享內(nèi)存,讀取 toc中的內(nèi)容 */
          seg = dsm_attach(DatumGetUInt32(main_arg));
          toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));

          /* 注冊(cè)退出時(shí)的回調(diào)函數(shù) */
          on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);

          /* 設(shè)置錯(cuò)誤消息隊(duì)列,將當(dāng)前worker設(shè)為發(fā)送者 */
          error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
          mq = (shm_mq *) (error_queue_space +
          ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
          shm_mq_set_sender(mq, MyProc);
          mqh = shm_mq_attach(mq, seg, NULL);

          /* 從庫(kù)中查找background worker要執(zhí)行的函數(shù),例如ParallelQueryMain。*/
          entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
          library_name = entrypointstate;
          function_name = entrypointstate + strlen(library_name) + 1;

          entrypt = LookupParallelWorkerFunction(library_name, function_name);

          ...

          /* 多次調(diào)用 shm_toc_lookup(shm_toc *toc, uint64 key, bool noError), 從 toc 中讀取狀態(tài)、參數(shù)等 */
          ...

          /* 執(zhí)行 ParallelQueryMain、_bt_parallel_build_main 等。*/
          entrypt(seg, toc);

          /* 退出并行模式 */
          ExitParallelMode();

          PopActiveSnapshot();

          EndParallelWorkerTransaction();

          DetachSession();
          ...
          }

          ParallelQueryMain

          并行 query 的入口函數(shù),在ParallelWorkerMain()中被調(diào)用。

               

          void
              
          ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
          {
          ...

          /* 設(shè)置tuple的接收者 */
          receiver = ExecParallelGetReceiver(seg, toc);

          /* 反序列化 PlannedStmt,ParamListInfo,SQL,并創(chuàng)建 QueryDesc */
          queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);

          ...

          ExecutorStart(queryDesc, fpes->eflags);

          ...

          /* 初始化 PlanState,根據(jù)node類(lèi)型調(diào)用不同的ExecXXXInitializeWorker();*/
          ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);

          ...

          ExecutorRun(queryDesc,
          ForwardScanDirection,
          fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
          true);

          ....

          ExecutorEnd(queryDesc);

          ...
          }


          性能分析與優(yōu)化

          使用并行框架能提升執(zhí)行的效率,但是也帶來(lái)了額外的開(kāi)銷(xiāo),經(jīng)過(guò)實(shí)際的測(cè)試,這個(gè)開(kāi)銷(xiāo)大概在5 ~ 10毫秒,也就是說(shuō)啟動(dòng)平行框架需要5毫秒以上。其中這個(gè)開(kāi)銷(xiāo)的主要有以下幾部分組成:

          1.  序列化

          序列化包括 plan,planstate,snap,GUC、library 等。GUC 的序列化耗時(shí)最高。
          GUC序列化耗時(shí)包括兩部分,一部分是遍歷所有的 GUC 參數(shù),估算參數(shù)占用的內(nèi)存大小,一部分是將所有參數(shù)序列化。因?yàn)?TDSQL for PG 的參數(shù)有超過(guò) 800 個(gè),內(nèi)存大小估算耗時(shí)大概 100 微秒,序列化耗時(shí)更高。
          GUC參數(shù)占用內(nèi)存的大小是不變的。因此可以不用每次啟動(dòng)并行框架時(shí)計(jì)算一次,可以放在系統(tǒng)啟動(dòng)階段時(shí)完成,比如放到build_guc_variables()函數(shù)中。
          因?yàn)槊看螆?zhí)行的參數(shù)可能會(huì)不一樣,所以不能在系統(tǒng)啟動(dòng)階段完成,可以在系統(tǒng)啟動(dòng)時(shí)序列化,啟動(dòng)并行框架時(shí)判斷參數(shù)是否有變化,如果有就重新序列化并保存。
          2.  動(dòng)態(tài)共享內(nèi)存的申請(qǐng)
          動(dòng)態(tài)共享內(nèi)存的申請(qǐng)主要耗時(shí)點(diǎn)在函數(shù)dsm_create()并行框架初始化過(guò)程中有兩個(gè)地方調(diào)用這個(gè)函數(shù)。
          一個(gè)是 GetSessionDsmHandle() 中調(diào)用,用來(lái)申請(qǐng) session 內(nèi)部進(jìn)程共享的內(nèi)存,每個(gè)session只需要調(diào)用一次;一次是為并行上下文申請(qǐng)共享內(nèi)存。每一次調(diào)用100微秒以上。
          3.  進(jìn)程間數(shù)據(jù)傳輸
          background worker 進(jìn)程將結(jié)果傳遞給 leader 進(jìn)程的耗時(shí)也不可避免,跟數(shù)據(jù)量成正比。
          調(diào)整shmq 的緩沖區(qū)大小并不能提升性能。因此需要結(jié)合優(yōu)化器把這部分代價(jià)也加入到執(zhí)行計(jì)劃之中。


          -- 更多精彩 --

          技術(shù)干貨丨TDSQL 列存引擎 LibraDB 計(jì)算模型的設(shè)計(jì)與思考


          技術(shù)干貨丨 TDSQL for MySQL DDL執(zhí)行框架


          分布式數(shù)據(jù)庫(kù)時(shí)代,需要什么樣的產(chǎn)品?



          瀏覽 111
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  8x8x最新地址 | 亚洲无码在线免费观看视频吗? | 激情网站五月天 | 日日于夜夜操免费视频 | 欧美亚州视频 |