Flink SQL窗口表值函數(shù)Window TVF聚合實(shí)現(xiàn)原理淺析

Hi,我是王知無(wú),一個(gè)大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。
放心關(guān)注我,獲取更多行業(yè)的一手消息。
引子
表值函數(shù)(table-valued function, TVF),顧名思義就是指返回值是一張表的函數(shù),在Oracle、SQL Server等數(shù)據(jù)庫(kù)中屢見(jiàn)不鮮。而在Flink的上一個(gè)穩(wěn)定版本1.13中,社區(qū)通過(guò)FLIP-145提出了窗口表值函數(shù)(window TVF)的實(shí)現(xiàn),用于替代舊版的窗口分組(grouped window)語(yǔ)法。
舉個(gè)栗子,在1.13之前,我們需要寫(xiě)如下的Flink SQL語(yǔ)句來(lái)做10秒的滾動(dòng)窗口聚合:
SELECT?TUMBLE_START(procTime,?INTERVAL?'10'?SECONDS)?AS?window_start,merchandiseId,COUNT(1)?AS?sellCount
FROM?rtdw_dwd.kafka_order_done_log
GROUP?BY?TUMBLE(procTime,?INTERVAL?'10'?SECONDS),merchandiseId;
在1.13版本中,則可以改寫(xiě)成如下的形式:
SELECT?window_start,window_end,merchandiseId,COUNT(1)?AS?sellCount
FROM?TABLE(?TUMBLE(TABLE?rtdw_dwd.kafka_order_done_log,?DESCRIPTOR(procTime),?INTERVAL?'10'?SECONDS)?)
GROUP?BY?window_start,window_end,merchandiseId;
根據(jù)設(shè)計(jì)文檔的描述,窗口表值函數(shù)的思想來(lái)自2019年的SIGMOD論文<
接下來(lái)本文簡(jiǎn)單探究一下基于窗口TVF的聚合邏輯,以及對(duì)累積窗口TVF做一點(diǎn)簡(jiǎn)單的改進(jìn)。
SQL定義
窗口TVF函數(shù)的類(lèi)圖如下所示。

Flink SQL在Calcite原生的SqlWindowTableFunction的基礎(chǔ)上加了指示窗口時(shí)間的三列,即window_start、window_end和window_time。SqlWindowTableFunction及其各個(gè)實(shí)現(xiàn)類(lèi)的主要工作是校驗(yàn)TVF的操作數(shù)是否合法(通過(guò)內(nèi)部抽象類(lèi)AbstractOperandMetadata和對(duì)應(yīng)的子類(lèi)OperandMetadataImpl)。這一部分不再贅述,在下文改進(jìn)累積窗口TVF的代碼中會(huì)涉及到。
物理計(jì)劃
目前窗口TVF不能單獨(dú)使用,需要配合窗口聚合或Top-N一起使用。以上文中的聚合為例,觀察其執(zhí)行計(jì)劃如下。
EXPLAIN?
SELECT?window_start,window_end,merchandiseId,COUNT(1)?AS?sellCount
FROM?TABLE(?TUMBLE(TABLE?rtdw_dwd.kafka_order_done_log,?DESCRIPTOR(procTime),?INTERVAL?'10'?SECONDS)?)
GROUP?BY?window_start,window_end,merchandiseId;
==?Abstract?Syntax?Tree?==
LogicalAggregate(group=[{0,?1,?2}],?sellCount=[COUNT()])
+-?LogicalProject(window_start=[$48],?window_end=[$49],?merchandiseId=[$10])
???+-?LogicalTableFunctionScan(invocation=[TUMBLE($47,?DESCRIPTOR($47),?10000:INTERVAL?SECOND)],?rowType=[RecordType(BIGINT?ts,?/*?......?*/,?TIMESTAMP_LTZ(3)?*PROCTIME*?procTime,?TIMESTAMP(3)?window_start,?TIMESTAMP(3)?window_end,?TIMESTAMP_LTZ(3)?*PROCTIME*?window_time)])
??????+-?LogicalProject(ts=[$0],?/*?......?*/,?procTime=[PROCTIME()])
?????????+-?LogicalTableScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]])
==?Optimized?Physical?Plan?==
Calc(select=[window_start,?window_end,?merchandiseId,?sellCount])
+-?WindowAggregate(groupBy=[merchandiseId],?window=[TUMBLE(time_col=[procTime],?size=[10?s])],?select=[merchandiseId,?COUNT(*)?AS?sellCount,?start('w$)?AS?window_start,?end('w$)?AS?window_end])
???+-?Exchange(distribution=[hash[merchandiseId]])
??????+-?Calc(select=[merchandiseId,?PROCTIME()?AS?procTime])
?????????+-?TableSourceScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]],?fields=[ts,?/*?......?*/])
==?Optimized?Execution?Plan?==
Calc(select=[window_start,?window_end,?merchandiseId,?sellCount])
+-?WindowAggregate(groupBy=[merchandiseId],?window=[TUMBLE(time_col=[procTime],?size=[10?s])],?select=[merchandiseId,?COUNT(*)?AS?sellCount,?start('w$)?AS?window_start,?end('w$)?AS?window_end])
???+-?Exchange(distribution=[hash[merchandiseId]])
??????+-?Calc(select=[merchandiseId,?PROCTIME()?AS?procTime])
?????????+-?TableSourceScan(table=[[hive,?rtdw_dwd,?kafka_order_done_log]],?fields=[ts,?/*?......?*/])
在Flink SQL規(guī)則集中,與如上查詢(xún)相關(guān)的規(guī)則按順序依次是:
ConverterRule:
StreamPhysicalWindowTableFunctionRule該規(guī)則將調(diào)用窗口TVF的邏輯節(jié)點(diǎn)(即調(diào)用
SqlWindowTableFunction的LogicalTableFunctionScan節(jié)點(diǎn))轉(zhuǎn)化為物理節(jié)點(diǎn)(StreamPhysicalWindowTableFunction)。ConverterRule:
StreamPhysicalWindowAggregateRule該規(guī)則將含有
window_start、window_end字段的邏輯聚合節(jié)點(diǎn)FlinkLogicalAggregate轉(zhuǎn)化為物理的窗口聚合節(jié)點(diǎn)StreamPhysicalWindowAggregate以及其上的投影StreamPhysicalCalc。在有其他分組字段的情況下,還會(huì)根據(jù)FlinkRelDistribution#hash生成StreamPhysicalExchange節(jié)點(diǎn)。RelOptRule:
PullUpWindowTableFunctionIntoWindowAggregateRule顧名思義,該規(guī)則將上面兩個(gè)規(guī)則產(chǎn)生的RelNode進(jìn)行整理,消除代表窗口TVF的物理節(jié)點(diǎn),并將它的語(yǔ)義上拉至聚合節(jié)點(diǎn)中,形成最終的物理計(jì)劃。
然后,StreamPhysicalWindowAggregate節(jié)點(diǎn)翻譯成StreamExecWindowAggregate節(jié)點(diǎn),進(jìn)入執(zhí)行階段。
切片化窗口與執(zhí)行
我們知道粒度太碎的滑動(dòng)窗口會(huì)使得狀態(tài)和Timer膨脹,比較危險(xiǎn),應(yīng)該用滾動(dòng)窗口+在線(xiàn)存儲(chǔ)+讀時(shí)聚合的方法代替。社區(qū)在設(shè)計(jì)窗口TVF聚合時(shí)顯然考慮到了這點(diǎn),提出了切片化窗口(sliced window)的概念,并以此為基礎(chǔ)設(shè)計(jì)了一套與DataStream API Windowing不同的窗口機(jī)制。
如下圖的累積窗口所示,每?jī)蓷l縱向虛線(xiàn)之間的部分就是一個(gè)切片(slice)。

切片的本質(zhì)就是將滑動(dòng)/累積窗口化為滾動(dòng)窗口,并盡可能地復(fù)用中間計(jì)算結(jié)果,降低狀態(tài)壓力。自然地,前文所述的Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化就都可以無(wú)縫應(yīng)用了。
那么,切片是如何分配的呢?答案是通過(guò)SliceAssigner體系,其類(lèi)圖如下。

CumulativeSliceAssigner多了一個(gè)isIncremental()方法,這是下文所做優(yōu)化的一步可見(jiàn),對(duì)于滾動(dòng)窗口而言,一個(gè)窗口就是一個(gè)切片;而對(duì)滑動(dòng)/累積窗口而言,一個(gè)窗口可能包含多個(gè)切片,一個(gè)切片也可能位于多個(gè)窗口中。所以共享切片的窗口要特別注意切片的過(guò)期與合并。以負(fù)責(zé)累積窗口的CumulativeSliceAssigner為例,對(duì)應(yīng)的邏輯如下。
@Override
public?Iterable?expiredSlices(long?windowEnd)?{
????long?windowStart?=?getWindowStart(windowEnd);
????long?firstSliceEnd?=?windowStart?+?step;
????long?lastSliceEnd?=?windowStart?+?maxSize;
????if?(windowEnd?==?firstSliceEnd)?{
????????//?we?share?state?in?the?first?slice,?skip?cleanup?for?the?first?slice
????????reuseExpiredList.clear();
????}?else?if?(windowEnd?==?lastSliceEnd)?{
????????//?when?this?is?the?last?slice,
????????//?we?need?to?cleanup?the?shared?state?(i.e.?first?slice)?and?the?current?slice
????????reuseExpiredList.reset(windowEnd,?firstSliceEnd);
????}?else?{
????????//?clean?up?current?slice
????????reuseExpiredList.reset(windowEnd);
????}
????return?reuseExpiredList;
}
@Override
public?void?mergeSlices(long?sliceEnd,?MergeCallback?callback)?throws?Exception?{
????long?windowStart?=?getWindowStart(sliceEnd);
????long?firstSliceEnd?=?windowStart?+?step;
????if?(sliceEnd?==?firstSliceEnd)?{
????????//?if?this?is?the?first?slice,?there?is?nothing?to?merge
????????reuseToBeMergedList.clear();
????}?else?{
????????//?otherwise,?merge?the?current?slice?state?into?the?first?slice?state
????????reuseToBeMergedList.reset(sliceEnd);
????}
????callback.merge(firstSliceEnd,?reuseToBeMergedList);
}
可見(jiàn),累積窗口的中間結(jié)果會(huì)被合并到第一個(gè)切片中。窗口未結(jié)束時(shí),除了第一個(gè)切片之外的其他切片觸發(fā)后都會(huì)過(guò)期。
實(shí)際處理切片化窗口的算子名為SlicingWindowOperator,它實(shí)際上是SlicingWindowProcessor的簡(jiǎn)單封裝。SlicingWindowProcessor的體系如下。

SlicingWindowProcessor的三個(gè)重要組成部分分別是:
WindowBuffer:在托管內(nèi)存區(qū)域分配的窗口數(shù)據(jù)緩存,避免在窗口未實(shí)際觸發(fā)時(shí)高頻訪(fǎng)問(wèn)狀態(tài);WindowValueState:窗口的狀態(tài),其schema為[key, window_end, accumulator]。窗口結(jié)束時(shí)間作為窗口狀態(tài)的命名空間(namespace);NamespaceAggsHandleFunction:通過(guò)代碼生成器AggsHandlerCodeGenerator生成的聚合函數(shù)體。注意它并不是一個(gè)AggregateFunction,但是大致遵循其規(guī)范。
每當(dāng)一條數(shù)據(jù)到來(lái)時(shí),調(diào)用AbstractWindowAggProcessor#processElement()方法,比較容易理解了。
@Override
public?boolean?processElement(RowData?key,?RowData?element)?throws?Exception?{
????long?sliceEnd?=?sliceAssigner.assignSliceEnd(element,?clockService);
????if?(!isEventTime)?{
????????//?always?register?processing?time?for?every?element?when?processing?time?mode
????????windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
????}
????if?(isEventTime?&&?isWindowFired(sliceEnd,?currentProgress,?shiftTimeZone))?{
????????//?the?assigned?slice?has?been?triggered,?which?means?current?element?is?late,
????????//?but?maybe?not?need?to?drop
????????long?lastWindowEnd?=?sliceAssigner.getLastWindowEnd(sliceEnd);
????????if?(isWindowFired(lastWindowEnd,?currentProgress,?shiftTimeZone))?{
????????????//?the?last?window?has?been?triggered,?so?the?element?can?be?dropped?now
????????????return?true;
????????}?else?{
????????????windowBuffer.addElement(key,?sliceStateMergeTarget(sliceEnd),?element);
????????????//?we?need?to?register?a?timer?for?the?next?unfired?window,
????????????//?because?this?may?the?first?time?we?see?elements?under?the?key
????????????long?unfiredFirstWindow?=?sliceEnd;
????????????while?(isWindowFired(unfiredFirstWindow,?currentProgress,?shiftTimeZone))?{
????????????????unfiredFirstWindow?+=?windowInterval;
????????????}
????????????windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
????????????return?false;
????????}
????}?else?{
????????//?the?assigned?slice?hasn't?been?triggered,?accumulate?into?the?assigned?slice
????????windowBuffer.addElement(key,?sliceEnd,?element);
????????return?false;
????}
}
而當(dāng)切片需要被合并時(shí),先從WindowValueState中取出已有的狀態(tài),再遍歷切片,并調(diào)用NamespaceAggsHandleFunction#merge()方法進(jìn)行合并,最后更新?tīng)顟B(tài)。
@Override
public?void?merge(@Nullable?Long?mergeResult,?Iterable?toBeMerged)?throws?Exception?{
????//?get?base?accumulator
????final?RowData?acc;
????if?(mergeResult?==?null)?{
????????//?null?means?the?merged?is?not?on?state,?create?a?new?acc
????????acc?=?aggregator.createAccumulators();
????}?else?{
????????RowData?stateAcc?=?windowState.value(mergeResult);
????????if?(stateAcc?==?null)?{
????????????acc?=?aggregator.createAccumulators();
????????}?else?{
????????????acc?=?stateAcc;
????????}
????}
????//?set?base?accumulator
????aggregator.setAccumulators(mergeResult,?acc);
????//?merge?slice?accumulators
????for?(Long?slice?:?toBeMerged)?{
????????RowData?sliceAcc?=?windowState.value(slice);
????????if?(sliceAcc?!=?null)?{
????????????aggregator.merge(slice,?sliceAcc);
????????}
????}
????//?set?merged?acc?into?state?if?the?merged?acc?is?on?state
????if?(mergeResult?!=?null)?{
????????windowState.update(mergeResult,?aggregator.getAccumulators());
????}
}
看官若要觀察codegen出來(lái)的聚合函數(shù)的代碼,可在log4j.properties文件中加上:
logger.codegen.name?=?org.apache.flink.table.runtime.generated
logger.codegen.level?=?DEBUG
一點(diǎn)改進(jìn)
有很多天級(jí)聚合+秒級(jí)觸發(fā)的Flink作業(yè),在DataStream API時(shí)代多由ContinuousProcessingTimeTrigger實(shí)現(xiàn),1.13版本之前的SQL則需要添加table.exec.emit.early-fire系列參數(shù)。正式采用1.13版本后,累積窗口(cumulate window)完美契合此類(lèi)需求。但是,有些作業(yè)的key規(guī)模比較大,在一天的晚些時(shí)候會(huì)頻繁向下游Redis刷入大量數(shù)據(jù),造成不必要的壓力。因此,筆者對(duì)累積窗口TVF做了略有侵入的小改動(dòng),通過(guò)一個(gè)布爾參數(shù)INCREMENTAL可控制只輸出切片之間發(fā)生變化的聚合結(jié)果。操作很簡(jiǎn)單:
修改 SqlCumulateTableFunction函數(shù)的簽名,以及配套的窗口參數(shù)類(lèi)CumulativeWindowSpec等;修改 SliceSharedWindowAggProcess#fireWindow()方法,如下。
@Override
public?void?fireWindow(Long?windowEnd)?throws?Exception?{
????sliceSharedAssigner.mergeSlices(windowEnd,?this);
????//?we?have?set?accumulator?in?the?merge()?method
????RowData?aggResult?=?aggregator.getValue(windowEnd);
????if?(!isWindowEmpty())?{
????????if?(sliceSharedAssigner?instanceof?CumulativeSliceAssigner
????????????????&&?((CumulativeSliceAssigner)?sliceSharedAssigner).isIncremental())?{
????????????RowData?stateValue?=?windowState.value(windowEnd);
????????????if?(stateValue?==?null?||?!stateValue.equals(aggResult))?{
????????????????collect(aggResult);
????????????}
????????}?else?{
????????????collect(aggResult);
????????}
????}
????//?we?should?register?next?window?timer?here,
????//?because?slices?are?shared,?maybe?no?elements?arrived?for?the?next?slices
????//?......
}
當(dāng)然,此方案會(huì)帶來(lái)訪(fǎng)問(wèn)狀態(tài)的overhead,后續(xù)會(huì)做極限壓測(cè)以觀察性能,并做適當(dāng)修改。

2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
