Flink SQL流式聚合Mini-Batch優(yōu)化原理淺析
前言
流式聚合(streaming aggregation)是我們編寫(xiě)實(shí)時(shí)業(yè)務(wù)邏輯時(shí)非常常見(jiàn)的場(chǎng)景,當(dāng)然也比較容易出現(xiàn)各種各樣的性能問(wèn)題。Flink SQL使得用戶可以通過(guò)簡(jiǎn)單的聚合函數(shù)和GROUP BY子句實(shí)現(xiàn)流式聚合,同時(shí)也內(nèi)置了一些優(yōu)化機(jī)制來(lái)解決部分case下可能遇到的瓶頸。本文對(duì)其中常用的Mini-Batch做個(gè)簡(jiǎn)要的介紹,順便從源碼看一看它的實(shí)現(xiàn)思路。
注意:截至當(dāng)前版本,F(xiàn)link SQL的流式聚合優(yōu)化暫時(shí)對(duì)窗口聚合(即
GROUP BY TUMBLE/HOP/SESSION)無(wú)效,僅對(duì)純無(wú)界流上的聚合有效。
Mini-Batch概述
Flink SQL中的Mini-Batch概念與Spark Streaming有些類似,即微批次處理。
在默認(rèn)情況下,聚合算子對(duì)攝入的每一條數(shù)據(jù),都會(huì)執(zhí)行“讀取累加器狀態(tài)→修改狀態(tài)→寫(xiě)回狀態(tài)”的操作。如果數(shù)據(jù)流量很大,狀態(tài)操作的overhead也會(huì)隨之增加,影響效率(特別是RocksDB這種序列化成本高的Backend)。開(kāi)啟Mini-Batch之后,攝入的數(shù)據(jù)會(huì)攢在算子內(nèi)部的buffer中,達(dá)到指定的容量或時(shí)間閾值后再做聚合邏輯。這樣,一批數(shù)據(jù)內(nèi)的每個(gè)key只需要執(zhí)行一次狀態(tài)讀寫(xiě)。如果key的量相對(duì)比較稀疏,優(yōu)化效果更加明顯。
未開(kāi)啟和開(kāi)啟Mini-Batch聚合機(jī)制的對(duì)比示意圖如下。

顯然,Mini-Batch機(jī)制會(huì)導(dǎo)致數(shù)據(jù)處理出現(xiàn)一定的延遲,用戶需要自己權(quán)衡時(shí)效性和吞吐量的重要程度再?zèng)Q定。
Mini-Batch聚合默認(rèn)是關(guān)閉的。要開(kāi)啟它,可以設(shè)定如下3個(gè)參數(shù)。
val?tEnv:?TableEnvironment?=?...
val?configuration?=?tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled",?"true")?????????//?啟用
configuration.setString("table.exec.mini-batch.allow-latency",?"5?s")????//?緩存超時(shí)時(shí)長(zhǎng)
configuration.setString("table.exec.mini-batch.size",?"5000")????????????//?緩存大小
開(kāi)啟Mini-Batch并執(zhí)行一個(gè)簡(jiǎn)單的無(wú)界流聚合查詢,觀察Web UI上展示的JobGraph如下。

注意LocalGroupAggregate和GlobalGroupAggregate就是基于Mini-Batch的Local-Global機(jī)制優(yōu)化的結(jié)果,在分析完原生Mini-Batch后會(huì)簡(jiǎn)單說(shuō)明。
Mini-Batch原理解析
產(chǎn)生水印
Mini-Batch機(jī)制底層對(duì)應(yīng)的優(yōu)化器規(guī)則名為MiniBatchIntervalInferRule(代碼略去),產(chǎn)生的物理節(jié)點(diǎn)為StreamExecMiniBatchAssigner,直接附加在Source節(jié)點(diǎn)的后面。其translateToPlanInternal()方法的源碼如下。
@SuppressWarnings("unchecked")
@Override
protected?Transformation?translateToPlanInternal(PlannerBase?planner)?{
????final?Transformation?inputTransform?=
????????????(Transformation)?getInputEdges().get(0).translateToPlan(planner);
????final?OneInputStreamOperator?operator;
????if?(miniBatchInterval.mode()?==?MiniBatchMode.ProcTime())?{
????????operator?=?new?ProcTimeMiniBatchAssignerOperator(miniBatchInterval.interval());
????}?else?if?(miniBatchInterval.mode()?==?MiniBatchMode.RowTime())?{
????????operator?=?new?RowTimeMiniBatchAssginerOperator(miniBatchInterval.interval());
????}?else?{
????????throw?new?TableException(
????????????????String.format(
????????????????????????"MiniBatchAssigner?shouldn't?be?in?%s?mode?this?is?a?bug,?please?file?an?issue.",
????????????????????????miniBatchInterval.mode()));
????}
????return?new?OneInputTransformation<>(
????????????inputTransform,
????????????getDescription(),
????????????operator,
????????????InternalTypeInfo.of(getOutputType()),
????????????inputTransform.getParallelism());
}
可見(jiàn),根據(jù)作業(yè)時(shí)間語(yǔ)義的不同,產(chǎn)生的算子也不同(本質(zhì)上都是OneInputStreamOperator)。先看processing time時(shí)間語(yǔ)義下產(chǎn)生的算子ProcTimeMiniBatchAssignerOperator的相關(guān)方法。
@Override
public?void?processElement(StreamRecord?element)?throws?Exception?{
????long?now?=?getProcessingTimeService().getCurrentProcessingTime();
????long?currentBatch?=?now?-?now?%?intervalMs;
????if?(currentBatch?>?currentWatermark)?{
????????currentWatermark?=?currentBatch;
????????//?emit
????????output.emitWatermark(new?Watermark(currentBatch));
????}
????output.collect(element);
}
@Override
public?void?onProcessingTime(long?timestamp)?throws?Exception?{
????long?now?=?getProcessingTimeService().getCurrentProcessingTime();
????long?currentBatch?=?now?-?now?%?intervalMs;
????if?(currentBatch?>?currentWatermark)?{
????????currentWatermark?=?currentBatch;
????????//?emit
????????output.emitWatermark(new?Watermark(currentBatch));
????}
????getProcessingTimeService().registerTimer(currentBatch?+?intervalMs,?this);
}
processing time語(yǔ)義下本不需要用到水印,但這里的處理非常巧妙,即借用水印作為分隔批次的標(biāo)記。每處理一條數(shù)據(jù),都檢查其時(shí)間戳是否處于當(dāng)前批次內(nèi),若新的批次已經(jīng)開(kāi)始,則發(fā)射一條新的水印,另外也注冊(cè)了Timer用于發(fā)射水印,且保證發(fā)射周期是上述table.exec.mini-batch.allow-latency參數(shù)指定的間隔。
event time語(yǔ)義下的思路相同,只需要檢查Source產(chǎn)生的水印的時(shí)間戳,并只發(fā)射符合周期的水印,不符合周期的水印不會(huì)流轉(zhuǎn)到下游。RowTimeMiniBatchAssginerOperator類中對(duì)應(yīng)的代碼如下。
@Override
public?void?processWatermark(Watermark?mark)?throws?Exception?{
????//?if?we?receive?a?Long.MAX_VALUE?watermark?we?forward?it?since?it?is?used
????//?to?signal?the?end?of?input?and?to?not?block?watermark?progress?downstream
????if?(mark.getTimestamp()?==?Long.MAX_VALUE?&&?currentWatermark?!=?Long.MAX_VALUE)?{
????????currentWatermark?=?Long.MAX_VALUE;
????????output.emitWatermark(mark);
????????return;
????}
????currentWatermark?=?Math.max(currentWatermark,?mark.getTimestamp());
????if?(currentWatermark?>=?nextWatermark)?{
????????advanceWatermark();
????}
}
private?void?advanceWatermark()?{
????output.emitWatermark(new?Watermark(currentWatermark));
????long?start?=?getMiniBatchStart(currentWatermark,?minibatchInterval);
????long?end?=?start?+?minibatchInterval?-?1;
????nextWatermark?=?end?>?currentWatermark???end?:?end?+?minibatchInterval;
}
攢批處理
在實(shí)現(xiàn)分組聚合的物理節(jié)點(diǎn)StreamExecGroupAggregate中,會(huì)對(duì)啟用了Mini-Batch的情況做特殊處理。
final?OneInputStreamOperator?operator;
if?(isMiniBatchEnabled)?{
????MiniBatchGroupAggFunction?aggFunction?=
????????????new?MiniBatchGroupAggFunction(
????????????????????aggsHandler,
????????????????????recordEqualiser,
????????????????????accTypes,
????????????????????inputRowType,
????????????????????inputCountIndex,
????????????????????generateUpdateBefore,
????????????????????tableConfig.getIdleStateRetention().toMillis());
????operator?=
????????????new?KeyedMapBundleOperator<>(
????????????????????aggFunction,?AggregateUtil.createMiniBatchTrigger(tableConfig));
}?else?{
????GroupAggFunction?aggFunction?=?new?GroupAggFunction(/*...*/);
????operator?=?new?KeyedProcessOperator<>(aggFunction);
}
可見(jiàn),生成的負(fù)責(zé)攢批處理的算子為KeyedMapBundleOperator,對(duì)應(yīng)的Function則是MiniBatchGroupAggFunction。先來(lái)看前者,在它的抽象基類中,有如下三個(gè)重要的屬性。
/**?The?map?in?heap?to?store?elements.?*/
private?transient?Map?bundle;
/**?The?trigger?that?determines?how?many?elements?should?be?put?into?a?bundle.?*/
private?final?BundleTrigger?bundleTrigger;
/**?The?function?used?to?process?when?receiving?element.?*/
private?final?MapBundleFunction?function;
bundle:即用于暫存數(shù)據(jù)的buffer。 bundleTrigger:與CountTrigger類似,負(fù)責(zé)在bundle內(nèi)的數(shù)據(jù)量達(dá)到閾值(即上文所述table.exec.mini-batch.size)時(shí)觸發(fā)計(jì)算。源碼很簡(jiǎn)單,不再貼出。 function:即MiniBatchGroupAggFunction,承載具體的計(jì)算邏輯。
算子內(nèi)對(duì)應(yīng)的處理方法如下。
@Override
public?void?processElement(StreamRecord?element)?throws?Exception?{
????//?get?the?key?and?value?for?the?map?bundle
????final?IN?input?=?element.getValue();
????final?K?bundleKey?=?getKey(input);
????final?V?bundleValue?=?bundle.get(bundleKey);
????//?get?a?new?value?after?adding?this?element?to?bundle
????final?V?newBundleValue?=?function.addInput(bundleValue,?input);
????//?update?to?map?bundle
????bundle.put(bundleKey,?newBundleValue);
????numOfElements++;
????bundleTrigger.onElement(input);
}
@Override
public?void?finishBundle()?throws?Exception?{
????if?(!bundle.isEmpty())?{
????????numOfElements?=?0;
????????function.finishBundle(bundle,?collector);
????????bundle.clear();
????}
????bundleTrigger.reset();
}
@Override
public?void?processWatermark(Watermark?mark)?throws?Exception?{
????finishBundle();
????super.processWatermark(mark);
}
每來(lái)一條數(shù)據(jù),就將其加入bundle中,增加計(jì)數(shù),并調(diào)用BundleTrigger#onElement()方法檢查是否達(dá)到了觸發(fā)閾值,如是,則回調(diào)finishBundle()方法處理已經(jīng)收齊的批次,并清空bundle。當(dāng)水印到來(lái)時(shí)也同樣處理,即可滿足批次超時(shí)的設(shè)定。
finishBundle()方法實(shí)際上代理了MiniBatchGroupAggFunction#finishBundle()方法,代碼比較冗長(zhǎng),看官可自行查閱,但是流程很簡(jiǎn)單:先創(chuàng)建累加器實(shí)例,再根據(jù)輸入數(shù)據(jù)的RowKind執(zhí)行累加或回撤操作(同時(shí)維護(hù)每個(gè)key對(duì)應(yīng)的狀態(tài)),最后輸出批次聚合結(jié)果的changelog。值得注意的是,MiniBatchGroupAggFunction中利用了代碼生成技術(shù)來(lái)自動(dòng)生成聚合函數(shù)的底層handler(即AggsHandleFunction),在Flink Table模塊中很常見(jiàn)。
Local-Global簡(jiǎn)述
Local-Global其實(shí)就是自動(dòng)利用兩階段聚合思想解決數(shù)據(jù)傾斜的優(yōu)化方案(是不是很方便),與MapReduce中引入Combiner類似。話休絮煩,直接上官網(wǎng)的圖吧。

要啟用Local-Global聚合,需要在啟用Mini-Batch的基礎(chǔ)上指定如下參數(shù)。
configuration.setString("table.optimizer.agg-phase-strategy",?"TWO_PHASE")
Local-Global機(jī)制底層對(duì)應(yīng)的優(yōu)化器規(guī)則名為TwoStageOptimizedAggregateRule,產(chǎn)生的物理節(jié)點(diǎn)分別是StreamExecLocalGroupAggregate(本地聚合)和StreamExecGlobalGroupAggregate(全局聚合)。在它們各自的translateToPlanInternal()方法中也都運(yùn)用了代碼生成技術(shù)生成對(duì)應(yīng)的聚合函數(shù)MiniBatchLocalGroupAggFunction和MiniBatchGlobalGroupAggFunction,代碼比較多,但思路同樣清晰,看官可自行找來(lái)看看。
