<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink SQL空閑狀態(tài)保留時(shí)間實(shí)現(xiàn)原理

          共 5303字,需瀏覽 11分鐘

           ·

          2021-06-03 08:10

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)
          回復(fù)”資源“獲取更多資源

          前言

          如果要列舉Flink SQL新手有可能犯的錯(cuò)誤,筆者認(rèn)為其中之一就是忘記設(shè)置空閑狀態(tài)保留時(shí)間導(dǎo)致狀態(tài)爆炸。

          為什么要設(shè)置

          如果我們?cè)跀?shù)據(jù)流上進(jìn)行分組查詢,分組處理產(chǎn)生的結(jié)果(不僅僅是聚合結(jié)果)會(huì)作為中間狀態(tài)存儲(chǔ)下來。隨著分組key的不斷增加,狀態(tài)自然也會(huì)不斷膨脹。但是這些狀態(tài)數(shù)據(jù)基本都有時(shí)效性,不必永久保留。例如,使用Top-N語法進(jìn)行去重,重復(fù)數(shù)據(jù)的出現(xiàn)一般都位于特定區(qū)間內(nèi)(例如一小時(shí)或一天內(nèi)),過了這段時(shí)間之后,對(duì)應(yīng)的狀態(tài)就不再需要了。Flink SQL提供的idle state retention time特性可以保證當(dāng)狀態(tài)中某個(gè)key對(duì)應(yīng)的數(shù)據(jù)未更新的時(shí)間達(dá)到閾值時(shí),該條狀態(tài)被自動(dòng)清理。設(shè)置方法是:

          stenv.getConfig().setIdleStateRetentionTime(Time.hours(24), Time.hours(36))

          注意setIdleStateRetentionTime()方法需要傳入兩個(gè)參數(shù):狀態(tài)的最小保留時(shí)間minRetentionTime和最大保留時(shí)間maxRetentionTime(根據(jù)實(shí)際業(yè)務(wù)決定),且兩者至少相差5分鐘。為什么會(huì)有這種限制呢?看一下源碼就知道了。

          如何實(shí)現(xiàn)的

          idle state retention time特性在底層以o.a.f.table.runtime.functions.CleanupState接口來表示,代碼如下。

          public interface CleanupState {
          default void registerProcessingCleanupTimer(
          ValueState<Long> cleanupTimeState,
          long currentTime,
          long minRetentionTime,
          long maxRetentionTime,
          TimerService timerService)
          throws Exception {
          // last registered timer
          Long curCleanupTime = cleanupTimeState.value();

          // check if a cleanup timer is registered and
          // that the current cleanup timer won't delete state we need to keep
          if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
          // we need to register a new (later) timer
          long cleanupTime = currentTime + maxRetentionTime;
          // register timer and remember clean-up time
          timerService.registerProcessingTimeTimer(cleanupTime);
          // delete expired timer
          if (curCleanupTime != null) {
          timerService.deleteProcessingTimeTimer(curCleanupTime);
          }
          cleanupTimeState.update(cleanupTime);
          }
          }
          }

          由上可知,每個(gè)key對(duì)應(yīng)的最近狀態(tài)清理時(shí)間會(huì)單獨(dú)維護(hù)在ValueState中。如果滿足以下兩條件之一:

          • ValueState為空(即這個(gè)key是第一次出現(xiàn))

          • 或者當(dāng)前時(shí)間加上minRetentionTime已經(jīng)超過了最近清理的時(shí)間

          就用當(dāng)前時(shí)間加上maxRetentionTime注冊(cè)新的Timer,并將其時(shí)間戳存入ValueState,用于觸發(fā)下一次清理。如果有已經(jīng)過期了的Timer,則一并刪除之。可見,如果minRetentionTime和maxRetentionTime的間隔設(shè)置太小,就會(huì)比較頻繁地產(chǎn)生Timer與更新ValueState,維護(hù)Timer的成本會(huì)變大(參見之前筆者寫的Timer原理文章),所以一般建議設(shè)置間隔比較長(zhǎng)的清理區(qū)間。

          CleanupState接口的繼承關(guān)系如下圖所示。

          可見支持空閑狀態(tài)清理的Function很多,但基類都是KeyedProcessFunctionWithCleanupState抽象類。它的源碼如下。

          public abstract class KeyedProcessFunctionWithCleanupState<K, IN, OUT>
          extends KeyedProcessFunction<K, IN, OUT> implements CleanupState {
          private static final long serialVersionUID = 2084560869233898457L;

          private final long minRetentionTime;
          private final long maxRetentionTime;
          protected final boolean stateCleaningEnabled;

          // holds the latest registered cleanup timer
          private ValueState<Long> cleanupTimeState;

          public KeyedProcessFunctionWithCleanupState(long minRetentionTime, long maxRetentionTime) {
          this.minRetentionTime = minRetentionTime;
          this.maxRetentionTime = maxRetentionTime;
          this.stateCleaningEnabled = minRetentionTime > 1;
          }

          protected void initCleanupTimeState(String stateName) {
          if (stateCleaningEnabled) {
          ValueStateDescriptor<Long> inputCntDescriptor =
          new ValueStateDescriptor<>(stateName, Types.LONG);
          cleanupTimeState = getRuntimeContext().getState(inputCntDescriptor);
          }
          }

          protected void registerProcessingCleanupTimer(Context ctx, long currentTime) throws Exception {
          if (stateCleaningEnabled) {
          registerProcessingCleanupTimer(
          cleanupTimeState,
          currentTime,
          minRetentionTime,
          maxRetentionTime,
          ctx.timerService());
          }
          }

          protected boolean isProcessingTimeTimer(OnTimerContext ctx) {
          return ctx.timeDomain() == TimeDomain.PROCESSING_TIME;
          }

          protected void cleanupState(State... states) {
          for (State state : states) {
          state.clear();
          }
          this.cleanupTimeState.clear();
          }

          protected Boolean needToCleanupState(Long timestamp) throws IOException {
          if (stateCleaningEnabled) {
          Long cleanupTime = cleanupTimeState.value();
          // check that the triggered timer is the last registered processing time timer.
          return timestamp.equals(cleanupTime);
          } else {
          return false;
          }
          }
          }

          可以發(fā)現(xiàn),空閑狀態(tài)保留時(shí)間目前仍然只支持processing time語義,并且minRetentionTime只有設(shè)為大于0的值才會(huì)生效。

          KeyedProcessFunctionWithCleanupState只是提供了一些helper方法,具體發(fā)揮作用需要到實(shí)現(xiàn)類中去找。以計(jì)算Top-N的AppendOnlyTopNFunction為例,它的processElement()方法中會(huì)對(duì)到來的每個(gè)元素注冊(cè)清理Timer:

          @Override
          public void processElement(RowData input, Context context, Collector<RowData> out) throws Exception {
          long currentTime = context.timerService().currentProcessingTime();
          // register state-cleanup timer
          registerProcessingCleanupTimer(context, currentTime);
          // ......
          }

          而一旦Timer觸發(fā),在onTimer()方法中調(diào)用基類的cleanupState()方法來實(shí)際清理:

          @Override
          public void onTimer(
          long timestamp,
          OnTimerContext ctx,
          Collector<RowData> out) throws Exception {
          if (stateCleaningEnabled) {
          // cleanup cache
          kvSortedMap.remove(keyContext.getCurrentKey());
          cleanupState(dataState);
          }
          }

          空閑狀態(tài)保留的邏輯并不僅應(yīng)用在上述Function中。在Table/SQL模塊中還有一個(gè)內(nèi)置的觸發(fā)器StateCleaningCountTrigger,它可以對(duì)窗口中的元素進(jìn)行計(jì)數(shù),并按照計(jì)數(shù)閾值或者空閑狀態(tài)保留的時(shí)間閾值來清理(即FIRE_AND_PURGE)。看官可自行參考對(duì)應(yīng)的源碼。






          瀏覽 45
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  苍井空一级婬片A片在哪看 | 人人澡人人妻人人爽人人蜜桃 | AV天堂资源网站 | 足交视频网站 | 伊人av电影 |