<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink SQL窗口表值函數(shù)Window TVF聚合實(shí)現(xiàn)原理淺析

          共 10348字,需瀏覽 21分鐘

           ·

          2022-05-15 20:41

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜
          八股文教給我,你們專(zhuān)心刷題和面試

          Hi,我是王知無(wú),一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。
          放心關(guān)注我,獲取更多行業(yè)的一手消息。

          引子

          表值函數(shù)(table-valued function, TVF),顧名思義就是指返回值是一張表的函數(shù),在Oracle、SQL Server等數(shù)據(jù)庫(kù)中屢見(jiàn)不鮮。而在Flink的上一個(gè)穩(wěn)定版本1.13中,社區(qū)通過(guò)FLIP-145提出了窗口表值函數(shù)(window TVF)的實(shí)現(xiàn),用于替代舊版的窗口分組(grouped window)語(yǔ)法。

          舉個(gè)栗子,在1.13之前,我們需要寫(xiě)如下的Flink SQL語(yǔ)句來(lái)做10秒的滾動(dòng)窗口聚合:

          SELECT?TUMBLE_START(procTime,?INTERVAL?'10'?SECONDS)?AS?window_start,merchandiseId,COUNT(1)?AS?sellCount
          FROM?rtdw_dwd.kafka_order_done_log
          GROUP?BY?TUMBLE(procTime,?INTERVAL?'10'?SECONDS),merchandiseId;

          在1.13版本中,則可以改寫(xiě)成如下的形式:

          SELECT?window_start,window_end,merchandiseId,COUNT(1)?AS?sellCount
          FROM?TABLE(?TUMBLE(TABLE?rtdw_dwd.kafka_order_done_log,?DESCRIPTOR(procTime),?INTERVAL?'10'?SECONDS)?)
          GROUP?BY?window_start,window_end,merchandiseId;

          根據(jù)設(shè)計(jì)文檔的描述,窗口表值函數(shù)的思想來(lái)自2019年的SIGMOD論文<>,而表值函數(shù)屬于SQL 2016標(biāo)準(zhǔn)的一部分。Calcite從1.25版本起也開(kāi)始提供對(duì)滾動(dòng)窗口和滑動(dòng)窗口TVF的支持。除了標(biāo)準(zhǔn)化、易于實(shí)現(xiàn)之外,窗口TVF還支持舊版語(yǔ)法所不具備的一些特性,如Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化、Top-N支持、GROUPING SETS語(yǔ)法等。

          接下來(lái)本文簡(jiǎn)單探究一下基于窗口TVF的聚合邏輯,以及對(duì)累積窗口TVF做一點(diǎn)簡(jiǎn)單的改進(jìn)。

          SQL定義

          窗口TVF函數(shù)的類(lèi)圖如下所示。

          Flink SQL在Calcite原生的SqlWindowTableFunction的基礎(chǔ)上加了指示窗口時(shí)間的三列,即window_start、window_endwindow_time。SqlWindowTableFunction及其各個(gè)實(shí)現(xiàn)類(lèi)的主要工作是校驗(yàn)TVF的操作數(shù)是否合法(通過(guò)內(nèi)部抽象類(lèi)AbstractOperandMetadata和對(duì)應(yīng)的子類(lèi)OperandMetadataImpl)。這一部分不再贅述,在下文改進(jìn)累積窗口TVF的代碼中會(huì)涉及到。

          物理計(jì)劃

          目前窗口TVF不能單獨(dú)使用,需要配合窗口聚合或Top-N一起使用。以上文中的聚合為例,觀察其執(zhí)行計(jì)劃如下。

          EXPLAIN?
          SELECT?window_start,window_end,merchandiseId,COUNT(1)?AS?sellCount
          FROM?TABLE(?TUMBLE(TABLE?rtdw_dwd.kafka_order_done_log,?DESCRIPTOR(procTime),?INTERVAL?'10'?SECONDS)?)
          GROUP?BY?window_start,window_end,merchandiseId;

          ==?Abstract?Syntax?Tree?==
          LogicalAggregate(group=[{0,?1,?2}],?sellCount=[COUNT()])
          +-?LogicalProject(window_start=[$48],?window_end=[$49],?merchandiseId=[$10])
          ???+-?LogicalTableFunctionScan(invocation=[TUMBLE($47,?DESCRIPTOR($47),?10000:INTERVAL?SECOND)],?rowType=[RecordType(BIGINT?ts,?/*?......?*/,?TIMESTAMP_LTZ(3)?*PROCTIME*?procTime,?TIMESTAMP(3)?window_start,?TIMESTAMP(3)?window_end,?TIMESTAMP_LTZ(3)?*PROCTIME*?window_time)])
          ??????+-?LogicalProject(ts=[$0],?/*?......?*/,?procTime=[PROCTIME()])
          ?????????+-?LogicalTableScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]])

          ==?Optimized?Physical?Plan?==
          Calc(select=[window_start,?window_end,?merchandiseId,?sellCount])
          +-?WindowAggregate(groupBy=[merchandiseId],?window=[TUMBLE(time_col=[procTime],?size=[10?s])],?select=[merchandiseId,?COUNT(*)?AS?sellCount,?start('w$)?AS?window_start,?end('w$)?AS?window_end])
          ???+-?Exchange(distribution=[hash[merchandiseId]])
          ??????+-?Calc(select=[merchandiseId,?PROCTIME()?AS?procTime])
          ?????????+-?TableSourceScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]],?fields=[ts,?/*?......?*/])

          ==?Optimized?Execution?Plan?==
          Calc(select=[window_start,?window_end,?merchandiseId,?sellCount])
          +-?WindowAggregate(groupBy=[merchandiseId],?window=[TUMBLE(time_col=[procTime],?size=[10?s])],?select=[merchandiseId,?COUNT(*)?AS?sellCount,?start('w$)?AS?window_start,?end('w$)?AS?window_end])
          ???+-?Exchange(distribution=[hash[merchandiseId]])
          ??????+-?Calc(select=[merchandiseId,?PROCTIME()?AS?procTime])
          ?????????+-?TableSourceScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]],?fields=[ts,?/*?......?*/])

          在Flink SQL規(guī)則集中,與如上查詢(xún)相關(guān)的規(guī)則按順序依次是:

          • ConverterRuleStreamPhysicalWindowTableFunctionRule

            該規(guī)則將調(diào)用窗口TVF的邏輯節(jié)點(diǎn)(即調(diào)用SqlWindowTableFunction的LogicalTableFunctionScan節(jié)點(diǎn))轉(zhuǎn)化為物理節(jié)點(diǎn)(StreamPhysicalWindowTableFunction)。

          • ConverterRuleStreamPhysicalWindowAggregateRule

            該規(guī)則將含有window_startwindow_end字段的邏輯聚合節(jié)點(diǎn)FlinkLogicalAggregate轉(zhuǎn)化為物理的窗口聚合節(jié)點(diǎn)StreamPhysicalWindowAggregate以及其上的投影StreamPhysicalCalc。在有其他分組字段的情況下,還會(huì)根據(jù)FlinkRelDistribution#hash生成StreamPhysicalExchange節(jié)點(diǎn)。

          • RelOptRulePullUpWindowTableFunctionIntoWindowAggregateRule

            顧名思義,該規(guī)則將上面兩個(gè)規(guī)則產(chǎn)生的RelNode進(jìn)行整理,消除代表窗口TVF的物理節(jié)點(diǎn),并將它的語(yǔ)義上拉至聚合節(jié)點(diǎn)中,形成最終的物理計(jì)劃。

          然后,StreamPhysicalWindowAggregate節(jié)點(diǎn)翻譯成StreamExecWindowAggregate節(jié)點(diǎn),進(jìn)入執(zhí)行階段。

          切片化窗口與執(zhí)行

          我們知道粒度太碎的滑動(dòng)窗口會(huì)使得狀態(tài)和Timer膨脹,比較危險(xiǎn),應(yīng)該用滾動(dòng)窗口+在線(xiàn)存儲(chǔ)+讀時(shí)聚合的方法代替。社區(qū)在設(shè)計(jì)窗口TVF聚合時(shí)顯然考慮到了這點(diǎn),提出了切片化窗口(sliced window)的概念,并以此為基礎(chǔ)設(shè)計(jì)了一套與DataStream API Windowing不同的窗口機(jī)制。

          如下圖的累積窗口所示,每?jī)蓷l縱向虛線(xiàn)之間的部分就是一個(gè)切片(slice)。

          切片的本質(zhì)就是將滑動(dòng)/累積窗口化為滾動(dòng)窗口,并盡可能地復(fù)用中間計(jì)算結(jié)果,降低狀態(tài)壓力。自然地,前文所述的Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化就都可以無(wú)縫應(yīng)用了。

          那么,切片是如何分配的呢?答案是通過(guò)SliceAssigner體系,其類(lèi)圖如下。

          注意CumulativeSliceAssigner多了一個(gè)isIncremental()方法,這是下文所做優(yōu)化的一步

          可見(jiàn),對(duì)于滾動(dòng)窗口而言,一個(gè)窗口就是一個(gè)切片;而對(duì)滑動(dòng)/累積窗口而言,一個(gè)窗口可能包含多個(gè)切片,一個(gè)切片也可能位于多個(gè)窗口中。所以共享切片的窗口要特別注意切片的過(guò)期與合并。以負(fù)責(zé)累積窗口的CumulativeSliceAssigner為例,對(duì)應(yīng)的邏輯如下。

          @Override
          public?Iterable?expiredSlices(long?windowEnd)?{
          ????long?windowStart?=?getWindowStart(windowEnd);
          ????long?firstSliceEnd?=?windowStart?+?step;
          ????long?lastSliceEnd?=?windowStart?+?maxSize;
          ????if?(windowEnd?==?firstSliceEnd)?{
          ????????//?we?share?state?in?the?first?slice,?skip?cleanup?for?the?first?slice
          ????????reuseExpiredList.clear();
          ????}?else?if?(windowEnd?==?lastSliceEnd)?{
          ????????//?when?this?is?the?last?slice,
          ????????//?we?need?to?cleanup?the?shared?state?(i.e.?first?slice)?and?the?current?slice
          ????????reuseExpiredList.reset(windowEnd,?firstSliceEnd);
          ????}?else?{
          ????????//?clean?up?current?slice
          ????????reuseExpiredList.reset(windowEnd);
          ????}
          ????return?reuseExpiredList;
          }

          @Override
          public?void?mergeSlices(long?sliceEnd,?MergeCallback?callback)?throws?Exception?{
          ????long?windowStart?=?getWindowStart(sliceEnd);
          ????long?firstSliceEnd?=?windowStart?+?step;
          ????if?(sliceEnd?==?firstSliceEnd)?{
          ????????//?if?this?is?the?first?slice,?there?is?nothing?to?merge
          ????????reuseToBeMergedList.clear();
          ????}?else?{
          ????????//?otherwise,?merge?the?current?slice?state?into?the?first?slice?state
          ????????reuseToBeMergedList.reset(sliceEnd);
          ????}
          ????callback.merge(firstSliceEnd,?reuseToBeMergedList);
          }

          可見(jiàn),累積窗口的中間結(jié)果會(huì)被合并到第一個(gè)切片中。窗口未結(jié)束時(shí),除了第一個(gè)切片之外的其他切片觸發(fā)后都會(huì)過(guò)期。

          實(shí)際處理切片化窗口的算子名為SlicingWindowOperator,它實(shí)際上是SlicingWindowProcessor的簡(jiǎn)單封裝。SlicingWindowProcessor的體系如下。

          SlicingWindowProcessor的三個(gè)重要組成部分分別是:

          • WindowBuffer:在托管內(nèi)存區(qū)域分配的窗口數(shù)據(jù)緩存,避免在窗口未實(shí)際觸發(fā)時(shí)高頻訪(fǎng)問(wèn)狀態(tài);
          • WindowValueState:窗口的狀態(tài),其schema為[key, window_end, accumulator]。窗口結(jié)束時(shí)間作為窗口狀態(tài)的命名空間(namespace);
          • NamespaceAggsHandleFunction:通過(guò)代碼生成器AggsHandlerCodeGenerator生成的聚合函數(shù)體。注意它并不是一個(gè)AggregateFunction,但是大致遵循其規(guī)范。

          每當(dāng)一條數(shù)據(jù)到來(lái)時(shí),調(diào)用AbstractWindowAggProcessor#processElement()方法,比較容易理解了。

          @Override
          public?boolean?processElement(RowData?key,?RowData?element)?throws?Exception?{
          ????long?sliceEnd?=?sliceAssigner.assignSliceEnd(element,?clockService);
          ????if?(!isEventTime)?{
          ????????//?always?register?processing?time?for?every?element?when?processing?time?mode
          ????????windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
          ????}
          ????if?(isEventTime?&&?isWindowFired(sliceEnd,?currentProgress,?shiftTimeZone))?{
          ????????//?the?assigned?slice?has?been?triggered,?which?means?current?element?is?late,
          ????????//?but?maybe?not?need?to?drop
          ????????long?lastWindowEnd?=?sliceAssigner.getLastWindowEnd(sliceEnd);
          ????????if?(isWindowFired(lastWindowEnd,?currentProgress,?shiftTimeZone))?{
          ????????????//?the?last?window?has?been?triggered,?so?the?element?can?be?dropped?now
          ????????????return?true;
          ????????}?else?{
          ????????????windowBuffer.addElement(key,?sliceStateMergeTarget(sliceEnd),?element);
          ????????????//?we?need?to?register?a?timer?for?the?next?unfired?window,
          ????????????//?because?this?may?the?first?time?we?see?elements?under?the?key
          ????????????long?unfiredFirstWindow?=?sliceEnd;
          ????????????while?(isWindowFired(unfiredFirstWindow,?currentProgress,?shiftTimeZone))?{
          ????????????????unfiredFirstWindow?+=?windowInterval;
          ????????????}
          ????????????windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
          ????????????return?false;
          ????????}
          ????}?else?{
          ????????//?the?assigned?slice?hasn't?been?triggered,?accumulate?into?the?assigned?slice
          ????????windowBuffer.addElement(key,?sliceEnd,?element);
          ????????return?false;
          ????}
          }

          而當(dāng)切片需要被合并時(shí),先從WindowValueState中取出已有的狀態(tài),再遍歷切片,并調(diào)用NamespaceAggsHandleFunction#merge()方法進(jìn)行合并,最后更新?tīng)顟B(tài)。

          @Override
          public?void?merge(@Nullable?Long?mergeResult,?Iterable?toBeMerged)?throws?Exception?{
          ????//?get?base?accumulator
          ????final?RowData?acc;
          ????if?(mergeResult?==?null)?{
          ????????//?null?means?the?merged?is?not?on?state,?create?a?new?acc
          ????????acc?=?aggregator.createAccumulators();
          ????}?else?{
          ????????RowData?stateAcc?=?windowState.value(mergeResult);
          ????????if?(stateAcc?==?null)?{
          ????????????acc?=?aggregator.createAccumulators();
          ????????}?else?{
          ????????????acc?=?stateAcc;
          ????????}
          ????}
          ????//?set?base?accumulator
          ????aggregator.setAccumulators(mergeResult,?acc);
          ????//?merge?slice?accumulators
          ????for?(Long?slice?:?toBeMerged)?{
          ????????RowData?sliceAcc?=?windowState.value(slice);
          ????????if?(sliceAcc?!=?null)?{
          ????????????aggregator.merge(slice,?sliceAcc);
          ????????}
          ????}
          ????//?set?merged?acc?into?state?if?the?merged?acc?is?on?state
          ????if?(mergeResult?!=?null)?{
          ????????windowState.update(mergeResult,?aggregator.getAccumulators());
          ????}
          }

          看官若要觀察codegen出來(lái)的聚合函數(shù)的代碼,可在log4j.properties文件中加上:

          logger.codegen.name?=?org.apache.flink.table.runtime.generated
          logger.codegen.level?=?DEBUG

          一點(diǎn)改進(jìn)

          有很多天級(jí)聚合+秒級(jí)觸發(fā)的Flink作業(yè),在DataStream API時(shí)代多由ContinuousProcessingTimeTrigger實(shí)現(xiàn),1.13版本之前的SQL則需要添加table.exec.emit.early-fire系列參數(shù)。正式采用1.13版本后,累積窗口(cumulate window)完美契合此類(lèi)需求。但是,有些作業(yè)的key規(guī)模比較大,在一天的晚些時(shí)候會(huì)頻繁向下游Redis刷入大量數(shù)據(jù),造成不必要的壓力。因此,筆者對(duì)累積窗口TVF做了略有侵入的小改動(dòng),通過(guò)一個(gè)布爾參數(shù)INCREMENTAL可控制只輸出切片之間發(fā)生變化的聚合結(jié)果。操作很簡(jiǎn)單:

          • 修改SqlCumulateTableFunction函數(shù)的簽名,以及配套的窗口參數(shù)類(lèi)CumulativeWindowSpec等;
          • 修改SliceSharedWindowAggProcess#fireWindow()方法,如下。
          @Override
          public?void?fireWindow(Long?windowEnd)?throws?Exception?{
          ????sliceSharedAssigner.mergeSlices(windowEnd,?this);
          ????//?we?have?set?accumulator?in?the?merge()?method
          ????RowData?aggResult?=?aggregator.getValue(windowEnd);
          ????if?(!isWindowEmpty())?{
          ????????if?(sliceSharedAssigner?instanceof?CumulativeSliceAssigner
          ????????????????&&?((CumulativeSliceAssigner)?sliceSharedAssigner).isIncremental())?{
          ????????????RowData?stateValue?=?windowState.value(windowEnd);
          ????????????if?(stateValue?==?null?||?!stateValue.equals(aggResult))?{
          ????????????????collect(aggResult);
          ????????????}
          ????????}?else?{
          ????????????collect(aggResult);
          ????????}
          ????}
          ????//?we?should?register?next?window?timer?here,
          ????//?because?slices?are?shared,?maybe?no?elements?arrived?for?the?next?slices
          ????//?......
          }

          當(dāng)然,此方案會(huì)帶來(lái)訪(fǎng)問(wèn)狀態(tài)的overhead,后續(xù)會(huì)做極限壓測(cè)以觀察性能,并做適當(dāng)修改。


          如果這個(gè)文章對(duì)你有幫助,不要忘記?「在看」?「點(diǎn)贊」?「收藏」?三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)

          互聯(lián)網(wǎng)最壞的時(shí)代可能真的來(lái)了
          我在B站讀大學(xué),大數(shù)據(jù)專(zhuān)業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線(xiàn)上問(wèn)題小盤(pán)點(diǎn)
          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱(chēng)SparkSQL為最強(qiáng)!
          硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
          標(biāo)簽體系下的用戶(hù)畫(huà)像建設(shè)小指南
          4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談
          大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié)
          我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章
          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 46
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产高清视频在线播放 | 欧美大香蕉四级片在线网站成熟 | 欧洲黄网| 日本黄色免费看 | 欧美成人电影在线 |