<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink SQL流式聚合Mini-Batch優(yōu)化原理淺析

          共 7548字,需瀏覽 16分鐘

           ·

          2022-03-06 00:19

          前言

          流式聚合(streaming aggregation)是我們編寫(xiě)實(shí)時(shí)業(yè)務(wù)邏輯時(shí)非常常見(jiàn)的場(chǎng)景,當(dāng)然也比較容易出現(xiàn)各種各樣的性能問(wèn)題。Flink SQL使得用戶可以通過(guò)簡(jiǎn)單的聚合函數(shù)和GROUP BY子句實(shí)現(xiàn)流式聚合,同時(shí)也內(nèi)置了一些優(yōu)化機(jī)制來(lái)解決部分case下可能遇到的瓶頸。本文對(duì)其中常用的Mini-Batch做個(gè)簡(jiǎn)要的介紹,順便從源碼看一看它的實(shí)現(xiàn)思路。

          注意:截至當(dāng)前版本,F(xiàn)link SQL的流式聚合優(yōu)化暫時(shí)對(duì)窗口聚合(即GROUP BY TUMBLE/HOP/SESSION)無(wú)效,僅對(duì)純無(wú)界流上的聚合有效。

          Mini-Batch概述

          Flink SQL中的Mini-Batch概念與Spark Streaming有些類似,即微批次處理。

          在默認(rèn)情況下,聚合算子對(duì)攝入的每一條數(shù)據(jù),都會(huì)執(zhí)行“讀取累加器狀態(tài)→修改狀態(tài)→寫(xiě)回狀態(tài)”的操作。如果數(shù)據(jù)流量很大,狀態(tài)操作的overhead也會(huì)隨之增加,影響效率(特別是RocksDB這種序列化成本高的Backend)。開(kāi)啟Mini-Batch之后,攝入的數(shù)據(jù)會(huì)攢在算子內(nèi)部的buffer中,達(dá)到指定的容量或時(shí)間閾值后再做聚合邏輯。這樣,一批數(shù)據(jù)內(nèi)的每個(gè)key只需要執(zhí)行一次狀態(tài)讀寫(xiě)。如果key的量相對(duì)比較稀疏,優(yōu)化效果更加明顯。

          未開(kāi)啟和開(kāi)啟Mini-Batch聚合機(jī)制的對(duì)比示意圖如下。

          顯然,Mini-Batch機(jī)制會(huì)導(dǎo)致數(shù)據(jù)處理出現(xiàn)一定的延遲,用戶需要自己權(quán)衡時(shí)效性和吞吐量的重要程度再?zèng)Q定。

          Mini-Batch聚合默認(rèn)是關(guān)閉的。要開(kāi)啟它,可以設(shè)定如下3個(gè)參數(shù)。

          val?tEnv:?TableEnvironment?=?...
          val?configuration?=?tEnv.getConfig().getConfiguration()

          configuration.setString("table.exec.mini-batch.enabled",?"true")?????????//?啟用
          configuration.setString("table.exec.mini-batch.allow-latency",?"5?s")????//?緩存超時(shí)時(shí)長(zhǎng)
          configuration.setString("table.exec.mini-batch.size",?"5000")????????????//?緩存大小

          開(kāi)啟Mini-Batch并執(zhí)行一個(gè)簡(jiǎn)單的無(wú)界流聚合查詢,觀察Web UI上展示的JobGraph如下。

          注意LocalGroupAggregate和GlobalGroupAggregate就是基于Mini-Batch的Local-Global機(jī)制優(yōu)化的結(jié)果,在分析完原生Mini-Batch后會(huì)簡(jiǎn)單說(shuō)明。

          Mini-Batch原理解析

          產(chǎn)生水印

          Mini-Batch機(jī)制底層對(duì)應(yīng)的優(yōu)化器規(guī)則名為MiniBatchIntervalInferRule(代碼略去),產(chǎn)生的物理節(jié)點(diǎn)為StreamExecMiniBatchAssigner,直接附加在Source節(jié)點(diǎn)的后面。其translateToPlanInternal()方法的源碼如下。

          @SuppressWarnings("unchecked")
          @Override
          protected?Transformation?translateToPlanInternal(PlannerBase?planner)?{
          ????final?Transformation?inputTransform?=
          ????????????(Transformation)?getInputEdges().get(0).translateToPlan(planner);
          ????final?OneInputStreamOperator?operator;

          ????if?(miniBatchInterval.mode()?==?MiniBatchMode.ProcTime())?{
          ????????operator?=?new?ProcTimeMiniBatchAssignerOperator(miniBatchInterval.interval());
          ????}?else?if?(miniBatchInterval.mode()?==?MiniBatchMode.RowTime())?{
          ????????operator?=?new?RowTimeMiniBatchAssginerOperator(miniBatchInterval.interval());
          ????}?else?{
          ????????throw?new?TableException(
          ????????????????String.format(
          ????????????????????????"MiniBatchAssigner?shouldn't?be?in?%s?mode?this?is?a?bug,?please?file?an?issue.",
          ????????????????????????miniBatchInterval.mode()));
          ????}

          ????return?new?OneInputTransformation<>(
          ????????????inputTransform,
          ????????????getDescription(),
          ????????????operator,
          ????????????InternalTypeInfo.of(getOutputType()),
          ????????????inputTransform.getParallelism());
          }

          可見(jiàn),根據(jù)作業(yè)時(shí)間語(yǔ)義的不同,產(chǎn)生的算子也不同(本質(zhì)上都是OneInputStreamOperator)。先看processing time時(shí)間語(yǔ)義下產(chǎn)生的算子ProcTimeMiniBatchAssignerOperator的相關(guān)方法。

          @Override
          public?void?processElement(StreamRecord?element)?throws?Exception?{
          ????long?now?=?getProcessingTimeService().getCurrentProcessingTime();
          ????long?currentBatch?=?now?-?now?%?intervalMs;
          ????if?(currentBatch?>?currentWatermark)?{
          ????????currentWatermark?=?currentBatch;
          ????????//?emit
          ????????output.emitWatermark(new?Watermark(currentBatch));
          ????}
          ????output.collect(element);
          }

          @Override
          public?void?onProcessingTime(long?timestamp)?throws?Exception?{
          ????long?now?=?getProcessingTimeService().getCurrentProcessingTime();
          ????long?currentBatch?=?now?-?now?%?intervalMs;
          ????if?(currentBatch?>?currentWatermark)?{
          ????????currentWatermark?=?currentBatch;
          ????????//?emit
          ????????output.emitWatermark(new?Watermark(currentBatch));
          ????}
          ????getProcessingTimeService().registerTimer(currentBatch?+?intervalMs,?this);
          }

          processing time語(yǔ)義下本不需要用到水印,但這里的處理非常巧妙,即借用水印作為分隔批次的標(biāo)記。每處理一條數(shù)據(jù),都檢查其時(shí)間戳是否處于當(dāng)前批次內(nèi),若新的批次已經(jīng)開(kāi)始,則發(fā)射一條新的水印,另外也注冊(cè)了Timer用于發(fā)射水印,且保證發(fā)射周期是上述table.exec.mini-batch.allow-latency參數(shù)指定的間隔。

          event time語(yǔ)義下的思路相同,只需要檢查Source產(chǎn)生的水印的時(shí)間戳,并只發(fā)射符合周期的水印,不符合周期的水印不會(huì)流轉(zhuǎn)到下游。RowTimeMiniBatchAssginerOperator類中對(duì)應(yīng)的代碼如下。

          @Override
          public?void?processWatermark(Watermark?mark)?throws?Exception?{
          ????//?if?we?receive?a?Long.MAX_VALUE?watermark?we?forward?it?since?it?is?used
          ????//?to?signal?the?end?of?input?and?to?not?block?watermark?progress?downstream
          ????if?(mark.getTimestamp()?==?Long.MAX_VALUE?&&?currentWatermark?!=?Long.MAX_VALUE)?{
          ????????currentWatermark?=?Long.MAX_VALUE;
          ????????output.emitWatermark(mark);
          ????????return;
          ????}
          ????currentWatermark?=?Math.max(currentWatermark,?mark.getTimestamp());
          ????if?(currentWatermark?>=?nextWatermark)?{
          ????????advanceWatermark();
          ????}
          }

          private?void?advanceWatermark()?{
          ????output.emitWatermark(new?Watermark(currentWatermark));
          ????long?start?=?getMiniBatchStart(currentWatermark,?minibatchInterval);
          ????long?end?=?start?+?minibatchInterval?-?1;
          ????nextWatermark?=?end?>?currentWatermark???end?:?end?+?minibatchInterval;
          }

          攢批處理

          在實(shí)現(xiàn)分組聚合的物理節(jié)點(diǎn)StreamExecGroupAggregate中,會(huì)對(duì)啟用了Mini-Batch的情況做特殊處理。

          final?OneInputStreamOperator?operator;
          if?(isMiniBatchEnabled)?{
          ????MiniBatchGroupAggFunction?aggFunction?=
          ????????????new?MiniBatchGroupAggFunction(
          ????????????????????aggsHandler,
          ????????????????????recordEqualiser,
          ????????????????????accTypes,
          ????????????????????inputRowType,
          ????????????????????inputCountIndex,
          ????????????????????generateUpdateBefore,
          ????????????????????tableConfig.getIdleStateRetention().toMillis());
          ????operator?=
          ????????????new?KeyedMapBundleOperator<>(
          ????????????????????aggFunction,?AggregateUtil.createMiniBatchTrigger(tableConfig));
          }?else?{
          ????GroupAggFunction?aggFunction?=?new?GroupAggFunction(/*...*/);
          ????operator?=?new?KeyedProcessOperator<>(aggFunction);
          }

          可見(jiàn),生成的負(fù)責(zé)攢批處理的算子為KeyedMapBundleOperator,對(duì)應(yīng)的Function則是MiniBatchGroupAggFunction。先來(lái)看前者,在它的抽象基類中,有如下三個(gè)重要的屬性。

          /**?The?map?in?heap?to?store?elements.?*/
          private?transient?Map?bundle;
          /**?The?trigger?that?determines?how?many?elements?should?be?put?into?a?bundle.?*/
          private?final?BundleTrigger?bundleTrigger;
          /**?The?function?used?to?process?when?receiving?element.?*/
          private?final?MapBundleFunction?function;
          • bundle:即用于暫存數(shù)據(jù)的buffer。
          • bundleTrigger:與CountTrigger類似,負(fù)責(zé)在bundle內(nèi)的數(shù)據(jù)量達(dá)到閾值(即上文所述table.exec.mini-batch.size)時(shí)觸發(fā)計(jì)算。源碼很簡(jiǎn)單,不再貼出。
          • function:即MiniBatchGroupAggFunction,承載具體的計(jì)算邏輯。

          算子內(nèi)對(duì)應(yīng)的處理方法如下。

          @Override
          public?void?processElement(StreamRecord?element)?throws?Exception?{
          ????//?get?the?key?and?value?for?the?map?bundle
          ????final?IN?input?=?element.getValue();
          ????final?K?bundleKey?=?getKey(input);
          ????final?V?bundleValue?=?bundle.get(bundleKey);
          ????//?get?a?new?value?after?adding?this?element?to?bundle
          ????final?V?newBundleValue?=?function.addInput(bundleValue,?input);
          ????//?update?to?map?bundle
          ????bundle.put(bundleKey,?newBundleValue);
          ????numOfElements++;
          ????bundleTrigger.onElement(input);
          }

          @Override
          public?void?finishBundle()?throws?Exception?{
          ????if?(!bundle.isEmpty())?{
          ????????numOfElements?=?0;
          ????????function.finishBundle(bundle,?collector);
          ????????bundle.clear();
          ????}
          ????bundleTrigger.reset();
          }

          @Override
          public?void?processWatermark(Watermark?mark)?throws?Exception?{
          ????finishBundle();
          ????super.processWatermark(mark);
          }

          每來(lái)一條數(shù)據(jù),就將其加入bundle中,增加計(jì)數(shù),并調(diào)用BundleTrigger#onElement()方法檢查是否達(dá)到了觸發(fā)閾值,如是,則回調(diào)finishBundle()方法處理已經(jīng)收齊的批次,并清空bundle。當(dāng)水印到來(lái)時(shí)也同樣處理,即可滿足批次超時(shí)的設(shè)定。

          finishBundle()方法實(shí)際上代理了MiniBatchGroupAggFunction#finishBundle()方法,代碼比較冗長(zhǎng),看官可自行查閱,但是流程很簡(jiǎn)單:先創(chuàng)建累加器實(shí)例,再根據(jù)輸入數(shù)據(jù)的RowKind執(zhí)行累加或回撤操作(同時(shí)維護(hù)每個(gè)key對(duì)應(yīng)的狀態(tài)),最后輸出批次聚合結(jié)果的changelog。值得注意的是,MiniBatchGroupAggFunction中利用了代碼生成技術(shù)來(lái)自動(dòng)生成聚合函數(shù)的底層handler(即AggsHandleFunction),在Flink Table模塊中很常見(jiàn)。

          Local-Global簡(jiǎn)述

          Local-Global其實(shí)就是自動(dòng)利用兩階段聚合思想解決數(shù)據(jù)傾斜的優(yōu)化方案(是不是很方便),與MapReduce中引入Combiner類似。話休絮煩,直接上官網(wǎng)的圖吧。

          要啟用Local-Global聚合,需要在啟用Mini-Batch的基礎(chǔ)上指定如下參數(shù)。

          configuration.setString("table.optimizer.agg-phase-strategy",?"TWO_PHASE")

          Local-Global機(jī)制底層對(duì)應(yīng)的優(yōu)化器規(guī)則名為TwoStageOptimizedAggregateRule,產(chǎn)生的物理節(jié)點(diǎn)分別是StreamExecLocalGroupAggregate(本地聚合)和StreamExecGlobalGroupAggregate(全局聚合)。在它們各自的translateToPlanInternal()方法中也都運(yùn)用了代碼生成技術(shù)生成對(duì)應(yīng)的聚合函數(shù)MiniBatchLocalGroupAggFunctionMiniBatchGlobalGroupAggFunction,代碼比較多,但思路同樣清晰,看官可自行找來(lái)看看。


          瀏覽 64
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产日韩欧美综合豆花 | 一级电影毛片 | 国产的内射 | 大香蕉婷婷在线 | 搞基操逼摸奶黄色视频网站 |