徹底搞清Flink中的Window(Flink版本1.8)

窗口
在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個(gè)消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。在這種情況下,我們必須定義一個(gè)窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算。
Flink 認(rèn)為 Batch 是 Streaming 的一個(gè)特例,所以 Flink 底層引擎是一個(gè)流式引擎,在上面實(shí)現(xiàn)了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個(gè)橋梁。
一個(gè)Window代表有限對(duì)象的集合。一個(gè)窗口有一個(gè)最大的時(shí)間戳,該時(shí)間戳意味著在其代表的某時(shí)間點(diǎn)——所有應(yīng)該進(jìn)入這個(gè)窗口的元素都已經(jīng)到達(dá)
Window就是用來對(duì)一個(gè)無(wú)限的流設(shè)置一個(gè)有限的集合,在有界的數(shù)據(jù)集上進(jìn)行操作的一種機(jī)制。window又可以分為基于時(shí)間(Time-based)的window以及基于數(shù)量(Count-based)的window。
Flink DataStream API提供了Time和Count的window,同時(shí)增加了基于Session的window。同時(shí),由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。
窗口的組成
窗口分配器
assignWindows將某個(gè)帶有時(shí)間戳timestamp的元素element分配給一個(gè)或多個(gè)窗口,并返回窗口集合
getDefaultTrigger 返回跟WindowAssigner關(guān)聯(lián)的默認(rèn)觸發(fā)器
getWindowSerializer返回WindowAssigner分配的窗口的序列化器
窗口分配器定義如何將數(shù)據(jù)元分配給窗口。這是通過WindowAssigner 在window(…)(對(duì)于被Keys化流)或windowAll()(對(duì)于非被Keys化流)調(diào)用中指定您的選擇來完成的。
WindowAssigner負(fù)責(zé)將每個(gè)傳入數(shù)據(jù)元分配給一個(gè)或多個(gè)窗口。Flink帶有預(yù)定義的窗口分配器,用于最常見的用例
即翻滾窗口, 滑動(dòng)窗口,會(huì)話窗口和全局窗口。您還可以通過擴(kuò)展WindowAssigner類來實(shí)現(xiàn)自定義窗口分配器。
所有內(nèi)置窗口分配器(全局窗口除外)都根據(jù)時(shí)間為窗口分配數(shù)據(jù)元,這可以是處理時(shí)間或事件時(shí)間。
State
狀態(tài),用來存儲(chǔ)窗口內(nèi)的元素,如果有 AggregateFunction,則存儲(chǔ)的是增量聚合的中間結(jié)果。
窗口函數(shù)
選擇合適的計(jì)算函數(shù),減少開發(fā)代碼量提高系統(tǒng)性能
增量聚合函數(shù)(窗口只維護(hù)狀態(tài))
ReduceFunction
AggregateFunction
FoldFunction
全量聚合函數(shù)(窗口維護(hù)窗口內(nèi)的數(shù)據(jù))
ProcessWindowFunction
全量計(jì)算
支持功能更加靈活
支持狀態(tài)操作
觸發(fā)器

EventTimeTrigger基于事件時(shí)間的觸發(fā)器,對(duì)應(yīng)onEventTime
ProcessingTimeTrigger
基于當(dāng)前系統(tǒng)時(shí)間的觸發(fā)器,對(duì)應(yīng)onProcessingTime
ProcessingTime 有最好的性能和最低的延遲。但在分布式計(jì)算環(huán)境中ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運(yùn)行有可能產(chǎn)生不同的計(jì)算結(jié)果。ContinuousEventTimeTrigger
ContinuousProcessingTimeTrigger
CountTrigger
Trigger確定何時(shí)窗口函數(shù)準(zhǔn)備好處理窗口(由窗口分配器形成)。每個(gè)都有默認(rèn)值。
如果默認(rèn)觸發(fā)器不符合您的需要,您可以使用指定自定義觸發(fā)器。WindowAssignerTriggertrigger(…)觸發(fā)器界面有五種方法可以Trigger對(duì)不同的事件做出反應(yīng):
onElement()為添加到窗口的每個(gè)數(shù)據(jù)元調(diào)用該方法。
onEventTime()在注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用該方法。
onProcessingTime()在注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用該方法。
該onMerge()方法與狀態(tài)觸發(fā)器相關(guān),并且當(dāng)它們的相應(yīng)窗口合并時(shí)合并兩個(gè)觸發(fā)器的狀態(tài),例如當(dāng)使用會(huì)話窗口時(shí)。
最后,該clear()方法在移除相應(yīng)窗口時(shí)執(zhí)行所需的任何動(dòng)作。
默認(rèn)觸發(fā)器
默認(rèn)觸發(fā)器GlobalWindow是NeverTrigger從不觸發(fā)的。因此,在使用時(shí)必須定義自定義觸發(fā)器GlobalWindow。
通過使用trigger()您指定觸發(fā)器會(huì)覆蓋a的默認(rèn)觸發(fā)器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據(jù)時(shí)間進(jìn)度獲取窗口,
而是僅按計(jì)數(shù)。現(xiàn)在,如果你想根據(jù)時(shí)間和數(shù)量做出反應(yīng),你必須編寫自己的自定義觸發(fā)器。event-time窗口分配器都有一個(gè)EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時(shí)出發(fā)。
觸發(fā)器分類
CountTrigger
一旦窗口中的數(shù)據(jù)元數(shù)量超過給定限制,就會(huì)觸發(fā)。所以其觸發(fā)機(jī)制實(shí)現(xiàn)在onElement中
ProcessingTimeTrigger
基于處理時(shí)間的觸發(fā)。
EventTimeTrigger
根據(jù) watermarks 度量的事件時(shí)間進(jìn)度進(jìn)行觸發(fā)。
PurgingTrigger
另一個(gè)觸發(fā)器作為參數(shù)作為參數(shù)并將其轉(zhuǎn)換為清除觸發(fā)器。
其作用是在 Trigger 觸發(fā)窗口計(jì)算之后將窗口的 State 中的數(shù)據(jù)清除。

image-20210202200710573 前兩條數(shù)據(jù)先后于20:01和20:02進(jìn)入窗口,此時(shí) State 中的值更新為3,同時(shí)到了Trigger的觸發(fā)時(shí)間,輸出結(jié)果為3。

image-20210202200733128 由于 PurgingTrigger 的作用,State 中的數(shù)據(jù)會(huì)被清除。

DeltaTrigger
DeltaTrigger 的應(yīng)用
有這樣一個(gè)車輛區(qū)間測(cè)試的需求,車輛每分鐘上報(bào)當(dāng)前位置與車速,每行進(jìn)10公里,計(jì)算區(qū)間內(nèi)最高車速。

觸發(fā)器原型
onElement
onProcessingTime
onEventTime
onMerge
clear
說明
TriggerResult可以是以下之一
CONTINUE 什么都不做
FIRE_AND_PURGE 觸發(fā)計(jì)算,然后清除窗口中的元素
FIRE 觸發(fā)計(jì)算 默認(rèn)情況下,內(nèi)置的觸發(fā)器只返回 FIRE,不會(huì)清除窗口狀態(tài)。
PURGE 清除窗口中的元素
所有的事件時(shí)間窗口分配器都有一個(gè) EventTimeTrigger 作為默認(rèn)觸發(fā)器。一旦 watermark 到達(dá)窗口末尾,這個(gè)觸發(fā)器就會(huì)被觸發(fā)。
全局窗口(GlobalWindow)的默認(rèn)觸發(fā)器是永不會(huì)被觸發(fā)的 NeverTrigger。因此,在使用全局窗口時(shí),必須自定義一個(gè)觸發(fā)器。
通過使用 trigger() 方法指定觸發(fā)器,將會(huì)覆蓋窗口分配器的默認(rèn)觸發(fā)器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
那么不會(huì)再根據(jù)時(shí)間進(jìn)度觸發(fā)窗口,而只能通過計(jì)數(shù)。目前為止,如果你希望基于時(shí)間以及計(jì)數(shù)進(jìn)行觸發(fā),則必須編寫自己的自定義觸發(fā)器。
窗口的分類
根據(jù)窗口是否調(diào)用keyBy算子key化,分為被Keys化Windows和非被Keys化Windows;

根據(jù)窗口的驅(qū)動(dòng)方式,分為時(shí)間驅(qū)動(dòng)(Time Window)、數(shù)據(jù)驅(qū)動(dòng)(Count Window);
根據(jù)窗口的元素分配方式,分為滾動(dòng)窗口(tumbling windows)、滑動(dòng)窗口(sliding windows)、會(huì)話窗口(session windows)以及全局窗口(global windows)
被Keys化Windows
可以理解為按照原始數(shù)據(jù)流中的某個(gè)key進(jìn)行分類,擁有同一個(gè)key值的數(shù)據(jù)流將為進(jìn)入同一個(gè)window,多個(gè)窗口并行的邏輯流
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
非被Keys化Windows
不做分類,每進(jìn)入一條數(shù)據(jù)即增加一個(gè)窗口,多個(gè)窗口并行,每個(gè)窗口處理1條數(shù)據(jù)
WindowAll 將元素按照某種特性聚集在一起,該函數(shù)不支持并行操作,默認(rèn)的并行度就是1,所以如果使用這個(gè)算子的話需要注意一下性能問題
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
區(qū)別
對(duì)于被Key化的數(shù)據(jù)流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細(xì)信息)。
擁有被Key化的數(shù)據(jù)流將允許您的窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行,因?yàn)槊總€(gè)邏輯被Key化的數(shù)據(jù)流可以獨(dú)立于其余任務(wù)進(jìn)行處理。
引用相同Keys的所有數(shù)據(jù)元將被發(fā)送到同一個(gè)并行任務(wù)。
Time-Based window(基于時(shí)間的窗口)
每一條記錄來了以后會(huì)根據(jù)時(shí)間屬性值采用不同的window assinger 方法分配給一個(gè)或者多個(gè)窗口,分為滾動(dòng)窗口(Tumbling windows)和滑動(dòng)窗口(Sliding windows)。
EventTime 數(shù)據(jù)本身攜帶的時(shí)間,默認(rèn)的時(shí)間屬性;
ProcessingTime 處理時(shí)間;
IngestionTime 數(shù)據(jù)進(jìn)入flink程序的時(shí)間;
Tumbling windows(滾動(dòng)窗口)
滾動(dòng)窗口下窗口之間不重疊,且窗口長(zhǎng)度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows創(chuàng)建一個(gè)基于Event Time或Processing Time的滾動(dòng)時(shí)間窗口。

下面示例以滾動(dòng)時(shí)間窗口(TumblingEventTimeWindows)為例,默認(rèn)模式是TimeCharacteristic.ProcessingTime處理時(shí)間
/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
所以如果使用Event Time即數(shù)據(jù)的實(shí)際產(chǎn)生時(shí)間,需要通過senv.setStreamTimeCharacteristic指定
// 指定使用數(shù)據(jù)的實(shí)際時(shí)間
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 這里減去8小時(shí),表示用UTC世界時(shí)間
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Sliding windows(滑動(dòng)窗口)
滑動(dòng)窗口以一個(gè)步長(zhǎng)(Slide)不斷向前滑動(dòng),窗口的長(zhǎng)度固定。使用時(shí),我們要設(shè)置Slide和Size。Slide的大小決定了Flink以多大的頻率來創(chuàng)建新的窗口,Slide較小,窗口的個(gè)數(shù)會(huì)很多。Slide小于窗口的Size時(shí),相鄰窗口會(huì)重疊,一個(gè)事件會(huì)被分配到多個(gè)窗口;Slide大于Size,有些事件可能被丟掉。

同理,如果是滑動(dòng)時(shí)間窗口,也是類似的:
// 窗口的大小是10s,每5s滑動(dòng)一次,也就是5s計(jì)算一次
.timeWindow(Time.seconds(10), Time.seconds(5))
這里使用的是timeWindow,通常使用window,那么兩者的區(qū)別是什么呢?
timeWindow其實(shí)判斷時(shí)間的處理模式是ProcessingTime還是SlidingEventTimeWindows,幫我們判斷好了,調(diào)用方法直接傳入(Time size, Time slide)這兩個(gè)參數(shù)就好了,如果是使用.window方法,則需要自己來判斷,就是前者寫法更簡(jiǎn)單一些。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
Count-Based window (基于計(jì)數(shù)的窗口)
Count Window 是根據(jù)元素個(gè)數(shù)對(duì)數(shù)據(jù)流進(jìn)行分組的,也分滾動(dòng)(tumb)和滑動(dòng)(slide)。
Tumbling Count Window
當(dāng)我們想要每100個(gè)用戶購(gòu)買行為事件統(tǒng)計(jì)購(gòu)買總數(shù),那么每當(dāng)窗口中填滿100個(gè)元素了,就會(huì)對(duì)窗口進(jìn)行計(jì)算,這種窗口我們稱之為翻滾計(jì)數(shù)窗口(Tumbling Count Window),上圖所示窗口大小為3個(gè)。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the buyCnt sum
.sum(1)
Sliding Count Window
當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計(jì)算每10個(gè)元素計(jì)算一次最近100個(gè)元素的總和,代碼示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
會(huì)話(session)窗口
SessionWindow中的Gap是一個(gè)非常重要的概念,它指的是session之間的間隔。
如果session之間的間隔大于指定的間隔,數(shù)據(jù)將會(huì)被劃分到不同的session中。比如,設(shè)定5秒的間隔,0-5屬于一個(gè)session,5-10屬于另一個(gè)session

DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
Global Windows(全局窗口)

總結(jié)
SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows
基于時(shí)間的滑動(dòng)窗口
SlidingEventTimeWindows
SlidingProcessingTimeWindows
基于時(shí)間的翻滾窗口
TumblingEventTimeWindows
TumblingProcessingTimeWindows
基于計(jì)數(shù)的滑動(dòng)窗口
countWindow(100, 10)
基于計(jì)數(shù)的翻滾窗口
countWindow(100)
會(huì)話窗口
會(huì)話窗口:一條記錄一個(gè)窗口ProcessingTimeSessionWindows
EventTimeSessionWindows
全局窗口(GlobalWindows)
GlobalWindow是一個(gè)全局窗口,被實(shí)現(xiàn)為單例模式。其maxTimestamp被設(shè)置為L(zhǎng)ong.MAX_VALUE。
該類內(nèi)部有一個(gè)靜態(tài)類定義了GlobalWindow的序列化器:Serializer。
延遲
默認(rèn)情況下,當(dāng)水印超過窗口末尾時(shí),會(huì)刪除延遲數(shù)據(jù)元。
但是,F(xiàn)link允許為窗口 算子指定最大允許延遲。允許延遲指定數(shù)據(jù)元在被刪除之前可以延遲多少時(shí)間,并且其默認(rèn)值為0.
在水印通過窗口結(jié)束之后但在通過窗口結(jié)束加上允許的延遲之前到達(dá)的數(shù)據(jù)元,仍然添加到窗口中。
根據(jù)使用的觸發(fā)器,延遲但未丟棄的數(shù)據(jù)元可能會(huì)導(dǎo)致窗口再次觸發(fā)。就是這種情況EventTimeTrigger。
當(dāng)指定允許的延遲大于0時(shí),在水印通過窗口結(jié)束后保持窗口及其內(nèi)容。在這些情況下,當(dāng)遲到但未掉落的數(shù)據(jù)元到達(dá)時(shí),它可能觸發(fā)窗口的另一次觸發(fā)。
這些射擊被稱為late firings,因?yàn)樗鼈兪怯蛇t到事件觸發(fā)的,與之相反的main firing 是窗口的第一次射擊。在會(huì)話窗口的情況下,后期點(diǎn)火可以進(jìn)一步導(dǎo)致窗口的合并,因?yàn)樗鼈兛梢浴皹蚪印眱蓚€(gè)預(yù)先存在的未合并窗口之間的間隙。
后期觸發(fā)發(fā)出的數(shù)據(jù)元應(yīng)該被視為先前計(jì)算的更新結(jié)果,即,您的數(shù)據(jù)流將包含同一計(jì)算的多個(gè)結(jié)果。根據(jù)您的應(yīng)用程序,您需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M(jìn)行重復(fù)數(shù)據(jù)刪除。
窗口的使用
Flink為每個(gè)窗口創(chuàng)建一個(gè)每個(gè)數(shù)據(jù)元的副本。鑒于此,翻滾窗口保存每個(gè)數(shù)據(jù)元的一個(gè)副本(一個(gè)數(shù)據(jù)元恰好屬于一個(gè)窗口,除非它被延遲)
動(dòng)窗口會(huì)每個(gè)數(shù)據(jù)元?jiǎng)?chuàng)建幾個(gè)復(fù)本,如“ 窗口分配器”部分中所述。因此,尺寸為1天且滑動(dòng)1秒的滑動(dòng)窗口可能不是一個(gè)好主意。ReduceFunction,AggregateFunction并且FoldFunction可以顯著降低存儲(chǔ)要求,因?yàn)樗鼈兗鼻械鼐酆蠑?shù)據(jù)元并且每個(gè)窗口只存儲(chǔ)一個(gè)值。
相反,僅使用 ProcessWindowFunction需要累積所有數(shù)據(jù)元。
Evictor
它剔除元素的時(shí)機(jī)是:在觸發(fā)器觸發(fā)之后,在窗口被處理(apply windowFunction)之前
Flink 的窗口模型允許在窗口分配器和觸發(fā)器之外指定一個(gè)可選的驅(qū)逐器(Evictor)。可以使用 evictor(…) 方法來完成。
驅(qū)逐器能夠在觸發(fā)器觸發(fā)之后,以及在應(yīng)用窗口函數(shù)之前或之后從窗口中移除元素默認(rèn)情況下,所有內(nèi)置的驅(qū)逐器在窗口函數(shù)之前使用
指定驅(qū)逐器可以避免預(yù)聚合(pre-aggregation),因?yàn)榇翱趦?nèi)所有元素必須在應(yīng)用計(jì)算之前傳遞給驅(qū)逐器。
Flink不保證窗口內(nèi)元素的順序。這意味著雖然驅(qū)逐者可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。
內(nèi)置的Evitor
TimeEvitor
以毫秒為單位的時(shí)間間隔作為參數(shù),對(duì)于給定的窗口,找到元素中的最大的時(shí)間戳max_ts,并刪除時(shí)間戳小于max_ts - interval的所有元素。
本質(zhì)上是將罪行的元素選出來
CountEvitor
保持窗口內(nèi)元素?cái)?shù)量符合用戶指定數(shù)量,如果多于用戶指定的數(shù)量,從窗口緩沖區(qū)的開頭丟棄剩余的元素。
DeltaEvitor
使用 DeltaFunction和 一個(gè)閾值,計(jì)算窗口緩沖區(qū)中的最后一個(gè)元素與其余每個(gè)元素之間的 delta 值,并刪除 delta 值大于或等于閾值的元素。
通過定義的DeltaFunction 和 Threshold ,計(jì)算窗口中元素和最新元素的 Delta 值,將Delta 值超過 Threshold的元素刪除
watermark
watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。
watermark Apache Flink為了處理EventTime 窗口計(jì)算提出的一種機(jī)制,本質(zhì)上也是一種時(shí)間戳,
由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統(tǒng)Event,
與普通數(shù)據(jù)流Event一樣流轉(zhuǎn)到對(duì)應(yīng)的下游算子,接收到Watermark Event的算子以此不斷調(diào)整自己管理的EventTime clock。
算子接收到一個(gè)Watermark時(shí)候,框架知道不會(huì)再有任何小于該Watermark的時(shí)間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時(shí)間維度)的方式。通常基于Event Time的數(shù)據(jù),自身都包含一個(gè)timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。
waterMark 的觸發(fā)時(shí)間機(jī)制(waterMark >= window_end_time)
當(dāng)?shù)谝淮斡|發(fā)之后,以后所有到達(dá)的該窗口的數(shù)據(jù)(遲到數(shù)據(jù))都會(huì)觸發(fā)該窗口
定義允許延遲,所以 waterMark
=window_end_time+allowedLateness 是窗口被關(guān)閉,數(shù)據(jù)被丟棄 對(duì)于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機(jī)制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù),(新進(jìn)來的數(shù)據(jù))晚于前面進(jìn)來的數(shù)據(jù),但是該數(shù)據(jù)所在窗口沒有被觸發(fā),
這個(gè)時(shí)候數(shù)據(jù)還是有效的——EventTime<WaterMark 的對(duì)于out-of-order的數(shù)據(jù),延遲太多
注意,如果不定義允許最大遲到時(shí)間,并且在有很多數(shù)據(jù)遲到的情況下,會(huì)嚴(yán)重影響正確結(jié)果,只要Event Time < watermark時(shí)間就會(huì)觸發(fā)窗口,也就是說遲到的每一條數(shù)據(jù)都會(huì)觸發(fā)
該窗口
產(chǎn)生方式
Punctuated
數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark(其實(shí)是根據(jù)某個(gè)計(jì)算條件來做判斷)。
在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。
每個(gè)事件都會(huì)攜帶事件,可以根據(jù)該時(shí)間產(chǎn)生一個(gè)watermark 或者可以根據(jù)事件攜帶的其他標(biāo)志——業(yè)務(wù)的結(jié)束標(biāo)志
Periodic - 周期性的(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。
在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會(huì)有很大的延時(shí)。
背景
流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的
但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。
對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了
它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)
解決的問題
Watermark的時(shí)間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時(shí)間戳不等于Event中的EventTime,
Event中的EventTime自產(chǎn)生那一刻起就不可以改變了,不受Apache Flink框架控制,
而Watermark的產(chǎn)生是在Apache Flink的Source節(jié)點(diǎn)或?qū)崿F(xiàn)的Watermark生成器計(jì)算產(chǎn)生(如上Apache Flink內(nèi)置的 Periodic Watermark實(shí)現(xiàn)),
Apache Flink內(nèi)部對(duì)單流或多流的場(chǎng)景有統(tǒng)一的Watermark處理。默認(rèn)情況下小于watermark 時(shí)間戳的event 會(huì)被丟棄嗎
多流waterMark
在實(shí)際的流計(jì)算中往往一個(gè)job中會(huì)處理多個(gè)Source的數(shù)據(jù),對(duì)Source的數(shù)據(jù)進(jìn)行GroupBy分組,那么來自不同Source的相同key值會(huì)shuffle到同一個(gè)處理節(jié)點(diǎn),
并攜帶各自的Watermark,Apache Flink內(nèi)部要保證Watermark要保持單調(diào)遞增,多個(gè)Source的Watermark匯聚到一起時(shí)候可能不是單調(diào)自增的Apache Flink內(nèi)部實(shí)現(xiàn)每一個(gè)邊上只能有一個(gè)遞增的Watermark, 當(dāng)出現(xiàn)多流攜帶Eventtime匯聚到一起(GroupBy or Union)時(shí)候,
Apache Flink會(huì)選擇所有流入的Eventtime中最小的一個(gè)向下游流出。從而保證watermark的單調(diào)遞增和保證數(shù)據(jù)的完整性
理解
默認(rèn)情況下watermark 已經(jīng)觸發(fā)過得窗口,即使有新數(shù)據(jù)(遲到)落進(jìn)去不會(huì)被計(jì)算 ,遲到的意思
watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即數(shù)據(jù)屬于這個(gè)窗口)
允許遲到
watermark>=window_n_end_time && watermark<window_n_end_time+lateness && window_n_start_time<=vent_time<window_n_end_time
在 watermark 大于窗口結(jié)束時(shí)間不超過特定延遲范圍時(shí),落在此窗口內(nèi)的數(shù)據(jù)是有效的,可以觸發(fā)窗口。
窗口聚合
增量聚合
窗口內(nèi)來一條數(shù)據(jù)就計(jì)算一次
全量聚合
一次計(jì)算整個(gè)窗口里的所有元素(可以進(jìn)行排序,一次一批可以針對(duì)外部鏈接)
使用
窗口之后調(diào)用 apply ,創(chuàng)建的元素里面方法的參數(shù)是一個(gè)迭代器
常用的一些方法
window
timeWindow和 countWind
process 和 apply
AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
簡(jiǎn)而言之,前一個(gè)接口將會(huì)周期性發(fā)送Watermark,而第二個(gè)接口根據(jù)一些到達(dá)數(shù)據(jù)的屬性,例如一旦在流中碰到一個(gè)特殊的element便發(fā)送Watermark。
自定義窗口
Window Assigner:負(fù)責(zé)將元素分配到不同的window。
Trigger即觸發(fā)器,定義何時(shí)或什么情況下Fire一個(gè)window。
對(duì)于CountWindow,我們可以直接使用已經(jīng)定義好的Trigger:CountTrigger trigger(CountTrigger.of(2))
Evictor(可選) 驅(qū)逐者,即保留上一window留下的某些元素。
最簡(jiǎn)單的情況,如果業(yè)務(wù)不是特別復(fù)雜,僅僅是基于Time和Count,我們其實(shí)可以用系統(tǒng)定義好的WindowAssigner以及Trigger和Evictor來實(shí)現(xiàn)不同的組合:
window 出現(xiàn)數(shù)據(jù)傾斜
window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:
在數(shù)據(jù)進(jìn)入窗口前做預(yù)聚合;
重新設(shè)計(jì)窗口聚合的 key;
