<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          徹底搞清Flink中的Window(Flink版本1.8)

          共 16984字,需瀏覽 34分鐘

           ·

          2021-03-31 19:24

          flink-window

          窗口

          在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個(gè)消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。在這種情況下,我們必須定義一個(gè)窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算。

          Flink 認(rèn)為 Batch 是 Streaming 的一個(gè)特例,所以 Flink 底層引擎是一個(gè)流式引擎,在上面實(shí)現(xiàn)了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個(gè)橋梁。

          • 一個(gè)Window代表有限對(duì)象的集合。一個(gè)窗口有一個(gè)最大的時(shí)間戳,該時(shí)間戳意味著在其代表的某時(shí)間點(diǎn)——所有應(yīng)該進(jìn)入這個(gè)窗口的元素都已經(jīng)到達(dá)

          • Window就是用來對(duì)一個(gè)無(wú)限的流設(shè)置一個(gè)有限的集合,在有界的數(shù)據(jù)集上進(jìn)行操作的一種機(jī)制。window又可以分為基于時(shí)間(Time-based)的window以及基于數(shù)量(Count-based)的window。

          • Flink DataStream API提供了Time和Count的window,同時(shí)增加了基于Session的window。同時(shí),由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。

          窗口的組成

          窗口分配器

          • assignWindows將某個(gè)帶有時(shí)間戳timestamp的元素element分配給一個(gè)或多個(gè)窗口,并返回窗口集合

          • getDefaultTrigger 返回跟WindowAssigner關(guān)聯(lián)的默認(rèn)觸發(fā)器

          • getWindowSerializer返回WindowAssigner分配的窗口的序列化器

          • 窗口分配器定義如何將數(shù)據(jù)元分配給窗口。這是通過WindowAssigner 在window(…)(對(duì)于被Keys化流)或windowAll()(對(duì)于非被Keys化流)調(diào)用中指定您的選擇來完成的。

          • WindowAssigner負(fù)責(zé)將每個(gè)傳入數(shù)據(jù)元分配給一個(gè)或多個(gè)窗口。Flink帶有預(yù)定義的窗口分配器,用于最常見的用例
            即翻滾窗口, 滑動(dòng)窗口,會(huì)話窗口和全局窗口。

          • 您還可以通過擴(kuò)展WindowAssigner類來實(shí)現(xiàn)自定義窗口分配器。

          • 所有內(nèi)置窗口分配器(全局窗口除外)都根據(jù)時(shí)間為窗口分配數(shù)據(jù)元,這可以是處理時(shí)間或事件時(shí)間。

          State

          • 狀態(tài),用來存儲(chǔ)窗口內(nèi)的元素,如果有 AggregateFunction,則存儲(chǔ)的是增量聚合的中間結(jié)果。

          窗口函數(shù)

          選擇合適的計(jì)算函數(shù),減少開發(fā)代碼量提高系統(tǒng)性能

          增量聚合函數(shù)(窗口只維護(hù)狀態(tài))

          • ReduceFunction

          • AggregateFunction

          • FoldFunction

          全量聚合函數(shù)(窗口維護(hù)窗口內(nèi)的數(shù)據(jù))

          • ProcessWindowFunction

          • 全量計(jì)算

          • 支持功能更加靈活

          • 支持狀態(tài)操作

          觸發(fā)器

          image-20210202200655485
          • EventTimeTrigger基于事件時(shí)間的觸發(fā)器,對(duì)應(yīng)onEventTime

          • ProcessingTimeTrigger
            基于當(dāng)前系統(tǒng)時(shí)間的觸發(fā)器,對(duì)應(yīng)onProcessingTime
            ProcessingTime 有最好的性能和最低的延遲。但在分布式計(jì)算環(huán)境中ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運(yùn)行有可能產(chǎn)生不同的計(jì)算結(jié)果。

          • ContinuousEventTimeTrigger

          • ContinuousProcessingTimeTrigger

          • CountTrigger

          • Trigger確定何時(shí)窗口函數(shù)準(zhǔn)備好處理窗口(由窗口分配器形成)。每個(gè)都有默認(rèn)值。
            如果默認(rèn)觸發(fā)器不符合您的需要,您可以使用指定自定義觸發(fā)器。WindowAssignerTriggertrigger(…)

          • 觸發(fā)器界面有五種方法可以Trigger對(duì)不同的事件做出反應(yīng):

            • onElement()為添加到窗口的每個(gè)數(shù)據(jù)元調(diào)用該方法。

            • onEventTime()在注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用該方法。

            • onProcessingTime()在注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用該方法。

            • 該onMerge()方法與狀態(tài)觸發(fā)器相關(guān),并且當(dāng)它們的相應(yīng)窗口合并時(shí)合并兩個(gè)觸發(fā)器的狀態(tài),例如當(dāng)使用會(huì)話窗口時(shí)。

            • 最后,該clear()方法在移除相應(yīng)窗口時(shí)執(zhí)行所需的任何動(dòng)作。

          • 默認(rèn)觸發(fā)器

            • 默認(rèn)觸發(fā)器GlobalWindow是NeverTrigger從不觸發(fā)的。因此,在使用時(shí)必須定義自定義觸發(fā)器GlobalWindow。

            • 通過使用trigger()您指定觸發(fā)器會(huì)覆蓋a的默認(rèn)觸發(fā)器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據(jù)時(shí)間進(jìn)度獲取窗口,
              而是僅按計(jì)數(shù)。現(xiàn)在,如果你想根據(jù)時(shí)間和數(shù)量做出反應(yīng),你必須編寫自己的自定義觸發(fā)器。

            • event-time窗口分配器都有一個(gè)EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時(shí)出發(fā)。 

          觸發(fā)器分類

          CountTrigger

          一旦窗口中的數(shù)據(jù)元數(shù)量超過給定限制,就會(huì)觸發(fā)。所以其觸發(fā)機(jī)制實(shí)現(xiàn)在onElement中

          ProcessingTimeTrigger

          基于處理時(shí)間的觸發(fā)。

          EventTimeTrigger

          根據(jù) watermarks 度量的事件時(shí)間進(jìn)度進(jìn)行觸發(fā)。

          PurgingTrigger
          • 另一個(gè)觸發(fā)器作為參數(shù)作為參數(shù)并將其轉(zhuǎn)換為清除觸發(fā)器。

          • 其作用是在 Trigger 觸發(fā)窗口計(jì)算之后將窗口的 State 中的數(shù)據(jù)清除。

          • image-20210202200710573

            前兩條數(shù)據(jù)先后于20:01和20:02進(jìn)入窗口,此時(shí) State 中的值更新為3,同時(shí)到了Trigger的觸發(fā)時(shí)間,輸出結(jié)果為3。


            image-20210202200733128
          • 由于 PurgingTrigger 的作用,State 中的數(shù)據(jù)會(huì)被清除。

          image-20210202200744793
          DeltaTrigger
          DeltaTrigger 的應(yīng)用
          • 有這樣一個(gè)車輛區(qū)間測(cè)試的需求,車輛每分鐘上報(bào)當(dāng)前位置與車速,每行進(jìn)10公里,計(jì)算區(qū)間內(nèi)最高車速。

          image-20210202200802480

          觸發(fā)器原型

          • onElement

          • onProcessingTime

          • onEventTime

          • onMerge

          • clear

          說明

          • TriggerResult可以是以下之一

            • CONTINUE 什么都不做

            • FIRE_AND_PURGE 觸發(fā)計(jì)算,然后清除窗口中的元素

            • FIRE 觸發(fā)計(jì)算  默認(rèn)情況下,內(nèi)置的觸發(fā)器只返回 FIRE,不會(huì)清除窗口狀態(tài)。

            • PURGE 清除窗口中的元素  

          • 所有的事件時(shí)間窗口分配器都有一個(gè) EventTimeTrigger 作為默認(rèn)觸發(fā)器。一旦 watermark 到達(dá)窗口末尾,這個(gè)觸發(fā)器就會(huì)被觸發(fā)。 

          • 全局窗口(GlobalWindow)的默認(rèn)觸發(fā)器是永不會(huì)被觸發(fā)的 NeverTrigger。因此,在使用全局窗口時(shí),必須自定義一個(gè)觸發(fā)器。

          • 通過使用 trigger() 方法指定觸發(fā)器,將會(huì)覆蓋窗口分配器的默認(rèn)觸發(fā)器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
            那么不會(huì)再根據(jù)時(shí)間進(jìn)度觸發(fā)窗口,而只能通過計(jì)數(shù)。目前為止,如果你希望基于時(shí)間以及計(jì)數(shù)進(jìn)行觸發(fā),則必須編寫自己的自定義觸發(fā)器。

          窗口的分類

          • 根據(jù)窗口是否調(diào)用keyBy算子key化,分為被Keys化Windows和非被Keys化Windows;

          flink window圖解
          • 根據(jù)窗口的驅(qū)動(dòng)方式,分為時(shí)間驅(qū)動(dòng)(Time Window)、數(shù)據(jù)驅(qū)動(dòng)(Count Window);

          • 根據(jù)窗口的元素分配方式,分為滾動(dòng)窗口(tumbling windows)、滑動(dòng)窗口(sliding windows)、會(huì)話窗口(session windows)以及全局窗口(global windows)

          被Keys化Windows

          可以理解為按照原始數(shù)據(jù)流中的某個(gè)key進(jìn)行分類,擁有同一個(gè)key值的數(shù)據(jù)流將為進(jìn)入同一個(gè)window,多個(gè)窗口并行的邏輯流

          stream
                 .keyBy(...)               <-  keyed versus non-keyed windows
                 .window(...)              <-  required: "assigner"
                [.trigger(...)]            <-  optional: "trigger" (else default trigger)
                [.evictor(...)]            <-  optional: "evictor" (else no evictor)
                [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
                [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
                 .reduce/aggregate/fold/apply()      <-  required: "function"
                [.getSideOutput(...)]      <-  optional: "output tag"

          非被Keys化Windows

          • 不做分類,每進(jìn)入一條數(shù)據(jù)即增加一個(gè)窗口,多個(gè)窗口并行,每個(gè)窗口處理1條數(shù)據(jù)

          • WindowAll 將元素按照某種特性聚集在一起,該函數(shù)不支持并行操作,默認(rèn)的并行度就是1,所以如果使用這個(gè)算子的話需要注意一下性能問題

            stream
                 .windowAll(...)           <-  required: "assigner"
                [.trigger(...)]            <-  optional: "trigger" (else default trigger)
                [.evictor(...)]            <-  optional: "evictor" (else no evictor)
                [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
                [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
                 .reduce/aggregate/fold/apply()      <-  required: "function"
                [.getSideOutput(...)]      <-  optional: "output tag"

          區(qū)別

          • 對(duì)于被Key化的數(shù)據(jù)流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細(xì)信息)。

          • 擁有被Key化的數(shù)據(jù)流將允許您的窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行,因?yàn)槊總€(gè)邏輯被Key化的數(shù)據(jù)流可以獨(dú)立于其余任務(wù)進(jìn)行處理。
            引用相同Keys的所有數(shù)據(jù)元將被發(fā)送到同一個(gè)并行任務(wù)。

          Time-Based window(基于時(shí)間的窗口)

          每一條記錄來了以后會(huì)根據(jù)時(shí)間屬性值采用不同的window assinger 方法分配給一個(gè)或者多個(gè)窗口,分為滾動(dòng)窗口(Tumbling windows)和滑動(dòng)窗口(Sliding windows)。

          • EventTime 數(shù)據(jù)本身攜帶的時(shí)間,默認(rèn)的時(shí)間屬性;

          • ProcessingTime 處理時(shí)間;

          • IngestionTime 數(shù)據(jù)進(jìn)入flink程序的時(shí)間;

          Tumbling windows(滾動(dòng)窗口)

          滾動(dòng)窗口下窗口之間不重疊,且窗口長(zhǎng)度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows創(chuàng)建一個(gè)基于Event Time或Processing Time的滾動(dòng)時(shí)間窗口。

          tumb-window

          下面示例以滾動(dòng)時(shí)間窗口(TumblingEventTimeWindows)為例,默認(rèn)模式是TimeCharacteristic.ProcessingTime處理時(shí)間

          /** The time characteristic that is used if none other is set. */
          private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

          所以如果使用Event Time即數(shù)據(jù)的實(shí)際產(chǎn)生時(shí)間,需要通過senv.setStreamTimeCharacteristic指定

          // 指定使用數(shù)據(jù)的實(shí)際時(shí)間
          senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

          DataStream<T> input = ...;

          // tumbling event-time windows
          input
              .keyBy(<key selector>)
              .window(TumblingEventTimeWindows.of(Time.seconds(5)))
              .<windowed transformation>(<window function>);

          // tumbling processing-time windows
          input
              .keyBy(<key selector>)
              .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
              .<windowed transformation>(<window function>);

          // 這里減去8小時(shí),表示用UTC世界時(shí)間
          input
              .keyBy(<key selector>)
              .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
              .<windowed transformation>(<window function>);

          Sliding windows(滑動(dòng)窗口)

          滑動(dòng)窗口以一個(gè)步長(zhǎng)(Slide)不斷向前滑動(dòng),窗口的長(zhǎng)度固定。使用時(shí),我們要設(shè)置Slide和Size。Slide的大小決定了Flink以多大的頻率來創(chuàng)建新的窗口,Slide較小,窗口的個(gè)數(shù)會(huì)很多。Slide小于窗口的Size時(shí),相鄰窗口會(huì)重疊,一個(gè)事件會(huì)被分配到多個(gè)窗口;Slide大于Size,有些事件可能被丟掉。

          slide-window

          同理,如果是滑動(dòng)時(shí)間窗口,也是類似的:

          // 窗口的大小是10s,每5s滑動(dòng)一次,也就是5s計(jì)算一次
          .timeWindow(Time.seconds(10), Time.seconds(5))

          這里使用的是timeWindow,通常使用window,那么兩者的區(qū)別是什么呢?

          timeWindow其實(shí)判斷時(shí)間的處理模式是ProcessingTime還是SlidingEventTimeWindows,幫我們判斷好了,調(diào)用方法直接傳入(Time size, Time slide)這兩個(gè)參數(shù)就好了,如果是使用.window方法,則需要自己來判斷,就是前者寫法更簡(jiǎn)單一些。

          public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
              if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
                  return window(SlidingProcessingTimeWindows.of(size, slide));
              } else {
                  return window(SlidingEventTimeWindows.of(size, slide));
              }
          }

          Count-Based window (基于計(jì)數(shù)的窗口)

          Count Window 是根據(jù)元素個(gè)數(shù)對(duì)數(shù)據(jù)流進(jìn)行分組的,也分滾動(dòng)(tumb)和滑動(dòng)(slide)。

          Tumbling Count Window
          當(dāng)我們想要每100個(gè)用戶購(gòu)買行為事件統(tǒng)計(jì)購(gòu)買總數(shù),那么每當(dāng)窗口中填滿100個(gè)元素了,就會(huì)對(duì)窗口進(jìn)行計(jì)算,這種窗口我們稱之為翻滾計(jì)數(shù)窗口(Tumbling Count Window),上圖所示窗口大小為3個(gè)。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):

          // Stream of (userId, buyCnts)
          val buyCnts: DataStream[(Int, Int)] = ...

          val tumblingCnts: DataStream[(Int, Int)] = buyCnts
            // key stream by sensorId
            .keyBy(0)
            // tumbling count window of 100 elements size
            .countWindow(100)
            // compute the buyCnt sum 
            .sum(1)

          Sliding Count Window
          當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計(jì)算每10個(gè)元素計(jì)算一次最近100個(gè)元素的總和,代碼示例如下。

          val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
            .keyBy(0)
            // sliding count window of 100 elements size and 10 elements trigger interval
            .countWindow(10010)
            .sum(1)

          會(huì)話(session)窗口

          • SessionWindow中的Gap是一個(gè)非常重要的概念,它指的是session之間的間隔。

          • 如果session之間的間隔大于指定的間隔,數(shù)據(jù)將會(huì)被劃分到不同的session中。比如,設(shè)定5秒的間隔,0-5屬于一個(gè)session,5-10屬于另一個(gè)session

          session-window
          DataStream<T> input = ...;

          // event-time session windows with static gap
          input
              .keyBy(<key selector>)
              .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
              .<windowed transformation>(<window function>);

          // event-time session windows with dynamic gap
          input
              .keyBy(<key selector>)
              .window(EventTimeSessionWindows.withDynamicGap((element) -> {
                  // determine and return session gap
              }))
              .<windowed transformation>(<window function>);

          // processing-time session windows with static gap
          input
              .keyBy(<key selector>)
              .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
              .<windowed transformation>(<window function>);

          // processing-time session windows with dynamic gap
          input
              .keyBy(<key selector>)
              .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
                  // determine and return session gap
              }))
              .<windowed transformation>(<window function>);

          Global Windows(全局窗口)

          global-window

          總結(jié)

          SlidingEventTimeWindows,  
          SlidingProcessingTimeWindows,  
          TumblingEventTimeWindows,  
          TumblingProcessingTimeWindows

          • 基于時(shí)間的滑動(dòng)窗口

            • SlidingEventTimeWindows

            • SlidingProcessingTimeWindows

          • 基于時(shí)間的翻滾窗口

            • TumblingEventTimeWindows

            • TumblingProcessingTimeWindows

          • 基于計(jì)數(shù)的滑動(dòng)窗口

            • countWindow(100, 10)

          • 基于計(jì)數(shù)的翻滾窗口

            • countWindow(100)

          • 會(huì)話窗口
            會(huì)話窗口:一條記錄一個(gè)窗口

            • ProcessingTimeSessionWindows

            • EventTimeSessionWindows

          • 全局窗口(GlobalWindows)

            • GlobalWindow是一個(gè)全局窗口,被實(shí)現(xiàn)為單例模式。其maxTimestamp被設(shè)置為L(zhǎng)ong.MAX_VALUE。

            • 該類內(nèi)部有一個(gè)靜態(tài)類定義了GlobalWindow的序列化器:Serializer。

          延遲

          默認(rèn)情況下,當(dāng)水印超過窗口末尾時(shí),會(huì)刪除延遲數(shù)據(jù)元。
          但是,F(xiàn)link允許為窗口 算子指定最大允許延遲。允許延遲指定數(shù)據(jù)元在被刪除之前可以延遲多少時(shí)間,并且其默認(rèn)值為0.
          在水印通過窗口結(jié)束之后但在通過窗口結(jié)束加上允許的延遲之前到達(dá)的數(shù)據(jù)元,仍然添加到窗口中。
          根據(jù)使用的觸發(fā)器,延遲但未丟棄的數(shù)據(jù)元可能會(huì)導(dǎo)致窗口再次觸發(fā)。就是這種情況EventTimeTrigger。

          當(dāng)指定允許的延遲大于0時(shí),在水印通過窗口結(jié)束后保持窗口及其內(nèi)容。在這些情況下,當(dāng)遲到但未掉落的數(shù)據(jù)元到達(dá)時(shí),它可能觸發(fā)窗口的另一次觸發(fā)。
          這些射擊被稱為late firings,因?yàn)樗鼈兪怯蛇t到事件觸發(fā)的,與之相反的main firing 是窗口的第一次射擊。在會(huì)話窗口的情況下,后期點(diǎn)火可以進(jìn)一步導(dǎo)致窗口的合并,因?yàn)樗鼈兛梢浴皹蚪印眱蓚€(gè)預(yù)先存在的未合并窗口之間的間隙。
          后期觸發(fā)發(fā)出的數(shù)據(jù)元應(yīng)該被視為先前計(jì)算的更新結(jié)果,即,您的數(shù)據(jù)流將包含同一計(jì)算的多個(gè)結(jié)果。根據(jù)您的應(yīng)用程序,您需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M(jìn)行重復(fù)數(shù)據(jù)刪除。

          窗口的使用

          • Flink為每個(gè)窗口創(chuàng)建一個(gè)每個(gè)數(shù)據(jù)元的副本。鑒于此,翻滾窗口保存每個(gè)數(shù)據(jù)元的一個(gè)副本(一個(gè)數(shù)據(jù)元恰好屬于一個(gè)窗口,除非它被延遲)
            動(dòng)窗口會(huì)每個(gè)數(shù)據(jù)元?jiǎng)?chuàng)建幾個(gè)復(fù)本,如“ 窗口分配器”部分中所述。因此,尺寸為1天且滑動(dòng)1秒的滑動(dòng)窗口可能不是一個(gè)好主意。

          • ReduceFunction,AggregateFunction并且FoldFunction可以顯著降低存儲(chǔ)要求,因?yàn)樗鼈兗鼻械鼐酆蠑?shù)據(jù)元并且每個(gè)窗口只存儲(chǔ)一個(gè)值。
            相反,僅使用 ProcessWindowFunction需要累積所有數(shù)據(jù)元。

          Evictor

          • 它剔除元素的時(shí)機(jī)是:在觸發(fā)器觸發(fā)之后,在窗口被處理(apply windowFunction)之前

          • Flink 的窗口模型允許在窗口分配器和觸發(fā)器之外指定一個(gè)可選的驅(qū)逐器(Evictor)。可以使用 evictor(…) 方法來完成。
            驅(qū)逐器能夠在觸發(fā)器觸發(fā)之后,以及在應(yīng)用窗口函數(shù)之前或之后從窗口中移除元素

          • 默認(rèn)情況下,所有內(nèi)置的驅(qū)逐器在窗口函數(shù)之前使用

          • 指定驅(qū)逐器可以避免預(yù)聚合(pre-aggregation),因?yàn)榇翱趦?nèi)所有元素必須在應(yīng)用計(jì)算之前傳遞給驅(qū)逐器。

          • Flink不保證窗口內(nèi)元素的順序。這意味著雖然驅(qū)逐者可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。

          內(nèi)置的Evitor

          • TimeEvitor

            • 以毫秒為單位的時(shí)間間隔作為參數(shù),對(duì)于給定的窗口,找到元素中的最大的時(shí)間戳max_ts,并刪除時(shí)間戳小于max_ts - interval的所有元素。

            • 本質(zhì)上是將罪行的元素選出來

          • CountEvitor

            • 保持窗口內(nèi)元素?cái)?shù)量符合用戶指定數(shù)量,如果多于用戶指定的數(shù)量,從窗口緩沖區(qū)的開頭丟棄剩余的元素。

          • DeltaEvitor

            • 使用 DeltaFunction和 一個(gè)閾值,計(jì)算窗口緩沖區(qū)中的最后一個(gè)元素與其余每個(gè)元素之間的 delta 值,并刪除 delta 值大于或等于閾值的元素。

            • 通過定義的DeltaFunction 和 Threshold ,計(jì)算窗口中元素和最新元素的 Delta 值,將Delta 值超過 Threshold的元素刪除

          watermark

          • watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。

          • watermark Apache Flink為了處理EventTime 窗口計(jì)算提出的一種機(jī)制,本質(zhì)上也是一種時(shí)間戳,
            由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統(tǒng)Event,
            與普通數(shù)據(jù)流Event一樣流轉(zhuǎn)到對(duì)應(yīng)的下游算子,接收到Watermark Event的算子以此不斷調(diào)整自己管理的EventTime clock。
            算子接收到一個(gè)Watermark時(shí)候,框架知道不會(huì)再有任何小于該Watermark的時(shí)間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時(shí)間維度)的方式。

          • 通常基于Event Time的數(shù)據(jù),自身都包含一個(gè)timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。

          • waterMark 的觸發(fā)時(shí)間機(jī)制(waterMark >= window_end_time)

            • 當(dāng)?shù)谝淮斡|發(fā)之后,以后所有到達(dá)的該窗口的數(shù)據(jù)(遲到數(shù)據(jù))都會(huì)觸發(fā)該窗口

            • 定義允許延遲,所以 waterMark

              =window_end_time+allowedLateness 是窗口被關(guān)閉,數(shù)據(jù)被丟棄
            • 對(duì)于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機(jī)制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù),(新進(jìn)來的數(shù)據(jù))晚于前面進(jìn)來的數(shù)據(jù),但是該數(shù)據(jù)所在窗口沒有被觸發(fā),
              這個(gè)時(shí)候數(shù)據(jù)還是有效的——EventTime<WaterMark 的

            • 對(duì)于out-of-order的數(shù)據(jù),延遲太多

            • 注意,如果不定義允許最大遲到時(shí)間,并且在有很多數(shù)據(jù)遲到的情況下,會(huì)嚴(yán)重影響正確結(jié)果,只要Event Time < watermark時(shí)間就會(huì)觸發(fā)窗口,也就是說遲到的每一條數(shù)據(jù)都會(huì)觸發(fā)
              該窗口

          產(chǎn)生方式

          • Punctuated

            • 數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark(其實(shí)是根據(jù)某個(gè)計(jì)算條件來做判斷)。

            • 在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。

            • 每個(gè)事件都會(huì)攜帶事件,可以根據(jù)該時(shí)間產(chǎn)生一個(gè)watermark 或者可以根據(jù)事件攜帶的其他標(biāo)志——業(yè)務(wù)的結(jié)束標(biāo)志

          • Periodic - 周期性的(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。

            在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會(huì)有很大的延時(shí)。

          背景

          • 流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的

          • 但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。 

          • 對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了

          • 它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)

          解決的問題

          • Watermark的時(shí)間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時(shí)間戳不等于Event中的EventTime,
            Event中的EventTime自產(chǎn)生那一刻起就不可以改變了,不受Apache Flink框架控制,
            而Watermark的產(chǎn)生是在Apache Flink的Source節(jié)點(diǎn)或?qū)崿F(xiàn)的Watermark生成器計(jì)算產(chǎn)生(如上Apache Flink內(nèi)置的 Periodic Watermark實(shí)現(xiàn)),
            Apache Flink內(nèi)部對(duì)單流或多流的場(chǎng)景有統(tǒng)一的Watermark處理。

          • 默認(rèn)情況下小于watermark 時(shí)間戳的event 會(huì)被丟棄嗎

          多流waterMark

          • 在實(shí)際的流計(jì)算中往往一個(gè)job中會(huì)處理多個(gè)Source的數(shù)據(jù),對(duì)Source的數(shù)據(jù)進(jìn)行GroupBy分組,那么來自不同Source的相同key值會(huì)shuffle到同一個(gè)處理節(jié)點(diǎn),
            并攜帶各自的Watermark,Apache Flink內(nèi)部要保證Watermark要保持單調(diào)遞增,多個(gè)Source的Watermark匯聚到一起時(shí)候可能不是單調(diào)自增的

          • Apache Flink內(nèi)部實(shí)現(xiàn)每一個(gè)邊上只能有一個(gè)遞增的Watermark, 當(dāng)出現(xiàn)多流攜帶Eventtime匯聚到一起(GroupBy or Union)時(shí)候,
            Apache Flink會(huì)選擇所有流入的Eventtime中最小的一個(gè)向下游流出。從而保證watermark的單調(diào)遞增和保證數(shù)據(jù)的完整性

          理解

          • 默認(rèn)情況下watermark 已經(jīng)觸發(fā)過得窗口,即使有新數(shù)據(jù)(遲到)落進(jìn)去不會(huì)被計(jì)算 ,遲到的意思

            watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即數(shù)據(jù)屬于這個(gè)窗口)

          • 允許遲到
            watermark>=window_n_end_time && watermark<window_n_end_time+lateness && window_n_start_time<=vent_time<window_n_end_time
            在 watermark 大于窗口結(jié)束時(shí)間不超過特定延遲范圍時(shí),落在此窗口內(nèi)的數(shù)據(jù)是有效的,可以觸發(fā)窗口。

          窗口聚合

          • 增量聚合

            • 窗口內(nèi)來一條數(shù)據(jù)就計(jì)算一次

          • 全量聚合

            • 一次計(jì)算整個(gè)窗口里的所有元素(可以進(jìn)行排序,一次一批可以針對(duì)外部鏈接)

            • 使用

            • 窗口之后調(diào)用 apply ,創(chuàng)建的元素里面方法的參數(shù)是一個(gè)迭代器

          常用的一些方法

          • window

          • timeWindow和 countWind

          • process 和 apply

          AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
          簡(jiǎn)而言之,前一個(gè)接口將會(huì)周期性發(fā)送Watermark,而第二個(gè)接口根據(jù)一些到達(dá)數(shù)據(jù)的屬性,例如一旦在流中碰到一個(gè)特殊的element便發(fā)送Watermark。

          自定義窗口

          • Window Assigner:負(fù)責(zé)將元素分配到不同的window。

          • Trigger即觸發(fā)器,定義何時(shí)或什么情況下Fire一個(gè)window。

          • 對(duì)于CountWindow,我們可以直接使用已經(jīng)定義好的Trigger:CountTrigger trigger(CountTrigger.of(2))

          • Evictor(可選) 驅(qū)逐者,即保留上一window留下的某些元素。

          • 最簡(jiǎn)單的情況,如果業(yè)務(wù)不是特別復(fù)雜,僅僅是基于Time和Count,我們其實(shí)可以用系統(tǒng)定義好的WindowAssigner以及Trigger和Evictor來實(shí)現(xiàn)不同的組合:

          window 出現(xiàn)數(shù)據(jù)傾斜

          • window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:

          • 在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合;

          • 重新設(shè)計(jì)窗口聚合的 key;


          猜你喜歡
          Flink實(shí)時(shí)計(jì)算topN熱榜
          數(shù)倉(cāng)建模分層理論
          Hive中的集合數(shù)據(jù)類型
          數(shù)倉(cāng)相關(guān)面試題
          Hive表的基本操作(必會(huì))
          上萬(wàn)字詳解Spark Core
          瀏覽 63
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产黄视频网站 | 亚洲AV无码久久久久网站飞鱼 | 人人草人人搞人人爱 | 伊人久久国产精品视频 | 婷婷精品在线观看 |