一網(wǎng)打盡Flink中的時間、窗口和流Join
點擊上方藍色字體,選擇“設(shè)為星標”
回復(fù)”面試“獲取更多驚喜

首先,我們會學(xué)習(xí)如何定義時間屬性,時間戳和水位線。然后我們將會學(xué)習(xí)底層操作process function,它可以讓我們訪問時間戳和水位線,以及注冊定時器事件。接下來,我們將會使用Flink的window API,它提供了通常使用的各種窗口類型的內(nèi)置實現(xiàn)。我們將會學(xué)到如何進行用戶自定義窗口操作符,以及窗口的核心功能:assigners(分配器)、triggers(觸發(fā)器)和evictors(清理器)。最后,我們將討論如何基于時間來做流的聯(lián)結(jié)查詢,以及處理遲到事件的策略。
時間操作
1 設(shè)置時間屬性
如果我們想要在分布式流處理應(yīng)用程序中定義有關(guān)時間的操作,徹底理解時間的語義是非常重要的。當(dāng)我們指定了一個窗口去收集某1分鐘內(nèi)的數(shù)據(jù)時,這個長度為1分鐘的桶中,到底應(yīng)該包含哪些數(shù)據(jù)?在DataStream API中,我們將使用時間屬性來告訴Flink:當(dāng)我們創(chuàng)建窗口時,我們?nèi)绾味x時間。時間屬性是StreamExecutionEnvironment的一個屬性,有以下值:
ProcessingTime
機器時間在分布式系統(tǒng)中又叫做“墻上時鐘”。
當(dāng)操作符執(zhí)行時,此操作符看到的時間是操作符所在機器的機器時間。Processing-time window的觸發(fā)取決于機器時間,窗口包含的元素也是那個機器時間段內(nèi)到達的元素。通常情況下,窗口操作符使用processing time會導(dǎo)致不確定的結(jié)果,因為基于機器時間的窗口中收集的元素取決于元素到達的速度快慢。使用processing time會為程序提供極低的延遲,因為無需等待水位線的到達。
如果要追求極限的低延遲,請使用processing time。
EventTime
當(dāng)操作符執(zhí)行時,操作符看的當(dāng)前時間是由流中元素所攜帶的信息決定的。流中的每一個元素都必須包含時間戳信息。而系統(tǒng)的邏輯時鐘由水位線(Watermark)定義。我們之前學(xué)習(xí)過,時間戳要么在事件進入流處理程序之前已經(jīng)存在,要么就需要在程序的數(shù)據(jù)源(source)處進行分配。當(dāng)水位線宣布特定時間段的數(shù)據(jù)都已經(jīng)到達,事件時間窗口將會被觸發(fā)計算。即使數(shù)據(jù)到達的順序是亂序的,事件時間窗口的計算結(jié)果也將是確定性的。窗口的計算結(jié)果并不取決于元素到達的快與慢。
當(dāng)水位線超過事件時間窗口的結(jié)束時間時,窗口將會閉合,不再接收數(shù)據(jù),并觸發(fā)計算。
IngestionTime
當(dāng)事件進入source操作符時,source操作符所在機器的機器時間,就是此事件的“攝入時間”(IngestionTime),并同時產(chǎn)生水位線。IngestionTime相當(dāng)于EventTime和ProcessingTime的混合體。一個事件的IngestionTime其實就是它進入流處理器中的時間。
IngestionTime沒什么價值,既有EventTime的執(zhí)行效率(比較低),有沒有EventTime計算結(jié)果的準確性。
下面的例子展示了如何設(shè)置事件時間。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<SensorReading> sensorData = env.addSource(...);
如果要使用processing time,將TimeCharacteristic.EventTime替換為TimeCharacteristic.ProcessingTIme就可以了。
1.1 指定時間戳和產(chǎn)生水位線
如果使用事件時間,那么流中的事件必須包含這個事件真正發(fā)生的時間。使用了事件時間的流必須攜帶水位線。
時間戳和水位線的單位是毫秒,記時從1970-01-01T00:00:00Z開始。到達某個操作符的水位線就會告知這個操作符:小于等于水位線中攜帶的時間戳的事件都已經(jīng)到達這個操作符了。時間戳和水位線可以由SourceFunction產(chǎn)生,或者由用戶自定義的時間戳分配器和水位線產(chǎn)生器來生成。
Flink暴露了TimestampAssigner接口供我們實現(xiàn),使我們可以自定義如何從事件數(shù)據(jù)中抽取時間戳。一般來說,時間戳分配器需要在source操作符后馬上進行調(diào)用。
因為時間戳分配器看到的元素的順序應(yīng)該和source操作符產(chǎn)生數(shù)據(jù)的順序是一樣的,否則就亂了。這就是為什么我們經(jīng)常將source操作符的并行度設(shè)置為1的原因。
也就是說,任何分區(qū)操作都會將元素的順序打亂,例如:并行度改變,keyBy()操作等等。
所以最佳實踐是:在盡量接近數(shù)據(jù)源source操作符的地方分配時間戳和產(chǎn)生水位線,甚至最好在SourceFunction中分配時間戳和產(chǎn)生水位線。當(dāng)然在分配時間戳和產(chǎn)生水位線之前可以對流進行map和filter操作是沒問題的,也就是說必須是窄依賴。
以下這種寫法是可以的。
DataStream<T> stream = env
.addSource(...)
.map(...)
.filter(...)
.assignTimestampsAndWatermarks(...)
下面的例子展示了首先filter流,然后再分配時間戳和水位線。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;
// 從調(diào)用時刻開始給env創(chuàng)建的每一個stream追加時間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<SensorReading> readings = env
.addSource(new SensorSource)
.filter(r -> r.temperature > 25)
.assignTimestampsAndWatermarks(new MyAssigner());
MyAssigner有兩種類型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上兩個接口都繼承自TimestampAssigner。
1.2 周期性的生成水位線
周期性的生成水位線:系統(tǒng)會周期性的將水位線插入到流中(水位線也是一種特殊的事件!)。默認周期是200毫秒,也就是說,系統(tǒng)會每隔200毫秒就往流中插入一次水位線。
這里的200毫秒是機器時間!
可以使用ExecutionConfig.setAutoWatermarkInterval()方法進行設(shè)置。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔5秒產(chǎn)生一個水位線
env.getConfig.setAutoWatermarkInterval(5000)
上面的例子產(chǎn)生水位線的邏輯:每隔5秒鐘,F(xiàn)link會調(diào)用AssignerWithPeriodicWatermarks中的getCurrentWatermark()方法。如果方法返回的時間戳大于之前水位線的時間戳,新的水位線會被插入到流中。這個檢查保證了水位線是單調(diào)遞增的。如果方法返回的時間戳小于等于之前水位線的時間戳,則不會產(chǎn)生新的水位線。
例子,自定義一個周期性的時間戳抽取
scala version
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound = 60 * 1000 // 延時為1分鐘
var maxTs = Long.MinValue + bound + 1 // 觀察到的最大時間戳
override def getCurrentWatermark: Watermark {
new Watermark(maxTs - bound - 1)
}
override def extractTimestamp(r: SensorReading, previousTS: Long) {
maxTs = maxTs.max(r.timestamp)
r.timestamp
}
}
java version
.assignTimestampsAndWatermarks(
// generate periodic watermarks
new AssignerWithPeriodicWatermarks[(String, Long)] {
val bound = 10 * 1000L // 最大延遲時間
var maxTs = Long.MinValue + bound + 1 // 當(dāng)前觀察到的最大時間戳
// 用來生成水位線
// 默認200ms調(diào)用一次
override def getCurrentWatermark: Watermark = {
println("generate watermark!!!" + (maxTs - bound - 1) + "ms")
new Watermark(maxTs - bound - 1)
}
// 每來一條數(shù)據(jù)都會調(diào)用一次
override def extractTimestamp(t: (String, Long), l: Long): Long = {
println("extract timestamp!!!")
maxTs = maxTs.max(t._2) // 更新觀察到的最大事件時間
t._2 // 抽取時間戳
}
}
)
如果我們事先得知數(shù)據(jù)流的時間戳是單調(diào)遞增的,也就是說沒有亂序。我們可以使用assignAscendingTimestamps,方法會直接使用數(shù)據(jù)的時間戳生成水位線。
scala version
val stream = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps(e => e.timestamp)
java version
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<SensorReading>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
@Override
public long extractTimestamp(SensorReading r, long l) {
return r.timestamp;
}
})
)
如果我們能大致估算出數(shù)據(jù)流中的事件的最大延遲時間,可以使用如下代碼:
最大延遲時間就是當(dāng)前到達的事件的事件時間和之前所有到達的事件中最大時間戳的差。
scala version
.assignTimestampsAndWatermarks(
// 水位線策略;默認200ms的機器時間插入一次水位線
// 水位線 = 當(dāng)前觀察到的事件所攜帶的最大時間戳 - 最大延遲時間
WatermarkStrategy
// 最大延遲時間設(shè)置為5s
.forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
// 告訴系統(tǒng)第二個字段是時間戳,時間戳的單位是毫秒
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2
})
)
java version
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
})
)
以上代碼設(shè)置了最大延遲時間為5秒。
1.3 如何產(chǎn)生不規(guī)則的水位線
有時候輸入流中會包含一些用于指示系統(tǒng)進度的特殊元組或標記。Flink為此類情形以及可根據(jù)輸入元素生成水位線的情形提供了AssignerWithPunctuatedWatermarks接口。該接口中的checkAndGetNextWatermark()方法會在針對每個事件的extractTimestamp()方法后立即調(diào)用。它可以決定是否生成一個新的水位線。如果該方法返回一個非空、且大于之前值的水位線,算子就會將這個新水位線發(fā)出。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound = 60 * 1000
// 每來一條數(shù)據(jù)就調(diào)用一次
// 緊跟`extractTimestamp`函數(shù)調(diào)用
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long) {
if (r.id == "sensor_1") {
// 抽取的時間戳 - 最大延遲時間
new Watermark(extractedTS - bound)
} else {
null
}
}
// 每來一條數(shù)據(jù)就調(diào)用一次
override def extractTimestamp(r: SensorReading, previousTS: Long) {
r.timestamp
}
}
現(xiàn)在我們已經(jīng)知道如何使用 TimestampAssigner 來產(chǎn)生水位線了?,F(xiàn)在我們要討論一下水位線會對我們的程序產(chǎn)生什么樣的影響。
水位線用來平衡延遲和計算結(jié)果的正確性。水位線告訴我們,在觸發(fā)計算(例如關(guān)閉窗口并觸發(fā)窗口計算)之前,我們需要等待事件多長時間?;谑录r間的操作符根據(jù)水位線來衡量系統(tǒng)的邏輯時間的進度。
完美的水位線永遠不會錯:時間戳小于水位線的事件不會再出現(xiàn)。在特殊情況下(例如非亂序事件流),最近一次事件的時間戳就可能是完美的水位線。啟發(fā)式水位線則相反,它只估計時間,因此有可能出錯,即遲到的事件(其時間戳小于水位線標記時間)晚于水位線出現(xiàn)。針對啟發(fā)式水位線,F(xiàn)link提供了處理遲到元素的機制。
設(shè)定水位線通常需要用到領(lǐng)域知識。舉例來說,如果知道事件的遲到時間不會超過5秒,就可以將水位線標記時間設(shè)為收到的最大時間戳減去5秒。另一種做法是,采用一個Flink作業(yè)監(jiān)控事件流,學(xué)習(xí)事件的遲到規(guī)律,并以此構(gòu)建水位線生成模型。
如果最大延遲時間設(shè)置的很大,計算出的結(jié)果會更精確,但收到計算結(jié)果的速度會很慢,同時系統(tǒng)會緩存大量的數(shù)據(jù),并對系統(tǒng)造成比較大的壓力。如果最大延遲時間設(shè)置的很小,那么收到計算結(jié)果的速度會很快,但可能收到錯誤的計算結(jié)果。不過Flink處理遲到數(shù)據(jù)的機制可以解決這個問題。上述問題看起來很復(fù)雜,但是恰恰符合現(xiàn)實世界的規(guī)律:大部分真實的事件流都是亂序的,并且通常無法了解它們的亂序程度(因為理論上不能預(yù)見未來)。水位線是唯一讓我們直面亂序事件流并保證正確性的機制; 否則只能選擇忽視事實,假裝錯誤的結(jié)果是正確的。
思考題一:實時程序,要求實時性非常高,并且結(jié)果并不一定要求非常準確,那么應(yīng)該怎么辦?
回答:直接使用處理時間。
思考題二:如果要進行時間旅行,也就是要還原以前的數(shù)據(jù)集當(dāng)時的流的狀態(tài),應(yīng)該怎么辦?
回答:使用事件時間。使用Hive將數(shù)據(jù)集先按照時間戳升序排列,再將最大延遲時間設(shè)置為0。
2 處理函數(shù)
我們之前學(xué)習(xí)的轉(zhuǎn)換算子是無法訪問事件的時間戳信息和水位線信息的。而這在一些應(yīng)用場景下,極為重要。例如MapFunction這樣的map轉(zhuǎn)換算子就無法訪問時間戳或者當(dāng)前事件的事件時間。
基于此,DataStream API提供了一系列的Low-Level轉(zhuǎn)換算子??梢栽L問時間戳、水位線以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。Process Function用來構(gòu)建事件驅(qū)動的應(yīng)用以及實現(xiàn)自定義的業(yè)務(wù)邏輯(使用之前的window函數(shù)和轉(zhuǎn)換算子無法實現(xiàn))。例如,F(xiàn)link-SQL就是使用Process Function實現(xiàn)的。
Flink提供了8個Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
我們這里詳細介紹一下KeyedProcessFunction。
KeyedProcessFunction用來操作KeyedStream。KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。所有的Process Function都繼承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:
processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一個元素都會調(diào)用這個方法,調(diào)用結(jié)果將會放在Collector數(shù)據(jù)類型中輸出。Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務(wù)。Context還可以將結(jié)果輸出到別的流(side outputs)。
onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一個回調(diào)函數(shù)。當(dāng)之前注冊的定時器觸發(fā)時調(diào)用。參數(shù)timestamp為定時器所設(shè)定的觸發(fā)的時間戳。Collector為輸出結(jié)果的集合。OnTimerContext和processElement的Context參數(shù)一樣,提供了上下文的一些信息,例如firing trigger的時間信息(事件時間或者處理時間)。
2.1 時間服務(wù)和定時器
Context和OnTimerContext所持有的TimerService對象擁有以下方法:
currentProcessingTime(): Long 返回當(dāng)前處理時間
currentWatermark(): Long 返回當(dāng)前水位線的時間戳
registerProcessingTimeTimer(timestamp: Long): Unit 會注冊當(dāng)前key的processing time的timer。當(dāng)processing time到達定時時間時,觸發(fā)timer。
registerEventTimeTimer(timestamp: Long): Unit 會注冊當(dāng)前key的event time timer。當(dāng)水位線大于等于定時器注冊的時間時,觸發(fā)定時器執(zhí)行回調(diào)函數(shù)。
deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前注冊處理時間定時器。如果沒有這個時間戳的定時器,則不執(zhí)行。
deleteEventTimeTimer(timestamp: Long): Unit 刪除之前注冊的事件時間定時器,如果沒有此時間戳的定時器,則不執(zhí)行。
當(dāng)定時器timer觸發(fā)時,執(zhí)行回調(diào)函數(shù)onTimer()。processElement()方法和onTimer()方法是同步(不是異步)方法,這樣可以避免并發(fā)訪問和操作狀態(tài)。
針對每一個key和timestamp,只能注冊一個定期器。也就是說,每一個key可以注冊多個定時器,但在每一個時間戳只能注冊一個定時器。KeyedProcessFunction默認將所有定時器的時間戳放在一個優(yōu)先隊列中。在Flink做檢查點操作時,定時器也會被保存到狀態(tài)后端中。
舉個例子說明KeyedProcessFunction如何操作KeyedStream。
下面的程序展示了如何監(jiān)控溫度傳感器的溫度值,如果溫度值在一秒鐘之內(nèi)(processing time)連續(xù)上升,報警。
scala version
val warnings = readings
.keyBy(r => r.id)
.process(new TempIncreaseAlertFunction)
class TempIncrease extends KeyedProcessFunction[String, SensorReading, String] {
// 懶加載;
// 狀態(tài)變量會在檢查點操作時進行持久化,例如hdfs
// 只會初始化一次,單例模式
// 在當(dāng)機重啟程序時,首先去持久化設(shè)備尋找名為`last-temp`的狀態(tài)變量,如果存在,則直接讀取。不存在,則初始化。
// 用來保存最近一次溫度
// 默認值是0.0
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("last-temp", Types.of[Double])
)
// 默認值是0L
lazy val timer: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 使用`.value()`方法取出最近一次溫度值,如果來的溫度是第一條溫度,則prevTemp為0.0
val prevTemp = lastTemp.value()
// 將到來的這條溫度值存入狀態(tài)變量中
lastTemp.update(value.temperature)
// 如果timer中有定時器的時間戳,則讀取
val ts = timer.value()
if (prevTemp == 0.0 || value.temperature < prevTemp) {
ctx.timerService().deleteProcessingTimeTimer(ts)
timer.clear()
} else if (value.temperature > prevTemp && ts == 0) {
val oneSecondLater = ctx.timerService().currentProcessingTime() + 1000L
ctx.timerService().registerProcessingTimeTimer(oneSecondLater)
timer.update(oneSecondLater)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("傳感器ID是 " + ctx.getCurrentKey + " 的傳感器的溫度連續(xù)1s上升了!")
timer.clear()
}
}
java version
DataStream<String> warings = readings
.keyBy(r -> r.id)
.process(new TempIncreaseAlertFunction());
看一下TempIncreaseAlertFunction如何實現(xiàn), 程序中使用了ValueState這樣一個狀態(tài)變量, 后面會詳細講解。
public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {
private ValueState<Double> lastTemp;
private ValueState<Long> currentTimer;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTemp = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-temp", Types.DOUBLE)
);
currentTimer = getRuntimeContext().getState(
new ValueStateDescriptor<>("current-timer", Types.LONG)
);
}
@Override
public void processElement(SensorReading r, Context ctx, Collector<String> out) throws Exception {
// 取出上一次的溫度
Double prevTemp = 0.0;
if (lastTemp.value() != null) {
prevTemp = lastTemp.value();
}
// 將當(dāng)前溫度更新到上一次的溫度這個變量中
lastTemp.update(r.temperature);
Long curTimerTimestamp = 0L;
if (currentTimer.value() != null) {
curTimerTimestamp = currentTimer.value();
}
if (prevTemp == 0.0 || r.temperature < prevTemp) {
// 溫度下降或者是第一個溫度值,刪除定時器
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
// 清空狀態(tài)變量
currentTimer.clear();
} else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
// 溫度上升且我們并沒有設(shè)置定時器
long timerTs = ctx.timerService().currentProcessingTime() + 1000L;
ctx.timerService().registerProcessingTimeTimer(timerTs);
// 保存定時器時間戳
currentTimer.update(timerTs);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect("傳感器id為: "
+ ctx.getCurrentKey()
+ "的傳感器溫度值已經(jīng)連續(xù)1s上升了。");
currentTimer.clear();
}
}
2.2 將事件發(fā)送到側(cè)輸出
大部分的DataStream API的算子的輸出是單一輸出,也就是某種數(shù)據(jù)類型的流。除了split算子,可以將一條流分成多條流,這些流的數(shù)據(jù)類型也都相同。process function的side outputs功能可以產(chǎn)生多條流,并且這些流的數(shù)據(jù)類型可以不一樣。一個side output可以定義為OutputTag[X]對象,X是輸出流的數(shù)據(jù)類型。process function可以通過Context對象發(fā)射一個事件到一個或者多個side outputs。
例子
scala version
object SideOutputExample {
val output = new OutputTag[String]("side-output")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
val warnings = stream
.process(new FreezingAlarm)
warnings.print() // 打印主流
warnings.getSideOutput(output).print() // 打印側(cè)輸出流
env.execute()
}
class FreezingAlarm extends ProcessFunction[SensorReading, SensorReading] {
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (value.temperature < 32.0) {
ctx.output(output, "傳感器ID為:" + value.id + "的傳感器溫度小于32度!")
}
out.collect(value)
}
}
}
java version
public class SideOutputExample {
private static OutputTag<String> output = new OutputTag<String>("side-output"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorReading> stream = env.addSource(new SensorSource());
SingleOutputStreamOperator<SensorReading> warnings = stream
.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
if (value.temperature < 32) {
ctx.output(output, "溫度小于32度!");
}
out.collect(value);
}
});
warnings.print();
warnings.getSideOutput(output).print();
env.execute();
}
}
2.3 CoProcessFunction
對于兩條輸入流,DataStream API提供了CoProcessFunction這樣的low-level操作。CoProcessFunction提供了操作每一個輸入流的方法: processElement1()和processElement2()。類似于ProcessFunction,這兩種方法都通過Context對象來調(diào)用。這個Context對象可以訪問事件數(shù)據(jù),定時器時間戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回調(diào)函數(shù)。下面的例子展示了如何使用CoProcessFunction來合并兩條流。
scala version
object SensorSwitch {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource).keyBy(r => r.id)
val switches = env.fromElements(("sensor_2", 10 * 1000L)).keyBy(r => r._1)
stream
.connect(switches)
.process(new SwitchProcess)
.print()
env.execute()
}
class SwitchProcess extends CoProcessFunction[SensorReading, (String, Long), SensorReading] {
lazy val forwardSwitch = getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("switch", Types.of[Boolean])
)
override def processElement1(value: SensorReading, ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if (forwardSwitch.value()) {
out.collect(value)
}
}
override def processElement2(value: (String, Long), ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context, out: Collector[SensorReading]): Unit = {
forwardSwitch.update(true)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + value._2)
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#OnTimerContext, out: Collector[SensorReading]): Unit = {
forwardSwitch.clear()
}
}
}
java version
public class SensorSwitch {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KeyedStream<SensorReading, String> stream = env
.addSource(new SensorSource())
.keyBy(r -> r.id);
KeyedStream<Tuple2<String, Long>, String> switches = env
.fromElements(Tuple2.of("sensor_2", 10 * 1000L))
.keyBy(r -> r.f0);
stream
.connect(switches)
.process(new SwitchProcess())
.print();
env.execute();
}
public static class SwitchProcess extends CoProcessFunction<SensorReading, Tuple2<String, Long>, SensorReading> {
private ValueState<Boolean> forwardingEnabled;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
forwardingEnabled = getRuntimeContext().getState(
new ValueStateDescriptor<>("filterSwitch", Types.BOOLEAN)
);
}
@Override
public void processElement1(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
if (forwardingEnabled.value() != null && forwardingEnabled.value()) {
out.collect(value);
}
}
@Override
public void processElement2(Tuple2<String, Long> value, Context ctx, Collector<SensorReading> out) throws Exception {
forwardingEnabled.update(true);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + value.f1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {
super.onTimer(timestamp, ctx, out);
forwardingEnabled.clear();
}
}
}
窗口操作
1 窗口操作符
窗口操作是流處理程序中很常見的操作。窗口操作允許我們在無限流上的一段有界區(qū)間上面做聚合之類的操作。而我們使用基于時間的邏輯來定義區(qū)間。窗口操作符提供了一種將數(shù)據(jù)放進一個桶,并根據(jù)桶中的數(shù)據(jù)做計算的方法。例如,我們可以將事件放進5分鐘的滾動窗口中,然后計數(shù)。
無限流轉(zhuǎn)化成有限數(shù)據(jù)的方法:使用窗口。
1.1 定義窗口操作符
Window算子可以在keyed stream或者nokeyed stream上面使用。
創(chuàng)建一個Window算子,需要指定兩個部分:
window assigner定義了流的元素如何分配到window中。window assigner將會產(chǎn)生一條WindowedStream(或者AllWindowedStream,如果是nonkeyed DataStream的話)
window function用來處理WindowedStream(AllWindowedStream)中的元素。
下面的代碼說明了如何使用窗口操作符。
stream
.keyBy(...)
.window(...) // 指定window assigner
.reduce/aggregate/process(...) // 指定window function
stream
.windowAll(...) // 指定window assigner
.reduce/aggregate/process(...) // 指定window function
我們的學(xué)習(xí)重點是Keyed WindowedStream。
1.2 內(nèi)置的窗口分配器
窗口分配器將會根據(jù)事件的事件時間或者處理時間來將事件分配到對應(yīng)的窗口中去。窗口包含開始時間和結(jié)束時間這兩個時間戳。
所有的窗口分配器都包含一個默認的觸發(fā)器:
對于事件時間:當(dāng)水位線超過窗口結(jié)束時間,觸發(fā)窗口的求值操作。
對于處理時間:當(dāng)機器時間超過窗口結(jié)束時間,觸發(fā)窗口的求值操作。
需要注意的是:當(dāng)處于某個窗口的第一個事件到達的時候,這個窗口才會被創(chuàng)建。Flink不會對空窗口求值。
Flink創(chuàng)建的窗口類型是TimeWindow,包含開始時間和結(jié)束時間,區(qū)間是左閉右開的,也就是說包含開始時間戳,不包含結(jié)束時間戳。
滾動窗口(tumbling windows)

DataStream<SensorReading> sensorData = ...
DataStream<T> avgTemp = sensorData
.keyBy(r -> r.id)
// group readings in 1s event-time windows
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(new TemperatureAverager);
DataStream<T> avgTemp = sensorData
.keyBy(r -> r.id)
// group readings in 1s processing-time windows
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.process(new TemperatureAverager);
// 其實就是之前的
// shortcut for window.(TumblingEventTimeWindows.of(size))
DataStream<T> avgTemp = sensorData
.keyBy(r -> r.id)
.timeWindow(Time.seconds(1))
.process(new TemperatureAverager);
默認情況下,滾動窗口會和1970-01-01-00:00:00.000對齊,例如一個1小時的滾動窗口將會定義以下開始時間的窗口:00:00:00,01:00:00,02:00:00,等等。
滑動窗口(sliding window)
對于滑動窗口,我們需要指定窗口的大小和滑動的步長。當(dāng)滑動步長小于窗口大小時,窗口將會出現(xiàn)重疊,而元素會被分配到不止一個窗口中去。當(dāng)滑動步長大于窗口大小時,一些元素可能不會被分配到任何窗口中去,會被直接丟棄。
下面的代碼定義了窗口大小為1小時,滑動步長為15分鐘的窗口。每一個元素將被分配到4個窗口中去。

DataStream<T> slidingAvgTemp = sensorData
.keyBy(r -> r.id)
.window(
SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))
)
.process(new TemperatureAverager);
DataStream<T> slidingAvgTemp = sensorData
.keyBy(r -> r.id)
.window(
SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(15))
)
.process(new TemperatureAverager);
DataStream<T> slidingAvgTemp = sensorData
.keyBy(r -> r.id)
.timeWindow(Time.hours(1), Time.minutes(15))
.process(new TemperatureAverager);
會話窗口(session windows)
會話窗口不可能重疊,并且會話窗口的大小也不是固定的。不活躍的時間長度定義了會話窗口的界限。不活躍的時間是指這段時間沒有元素到達。下圖展示了元素如何被分配到會話窗口。

DataStream<T> sessionWindows = sensorData
.keyBy(r -> r.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
.process(...);
DataStream<T> sessionWindows = sensorData
.keyBy(r -> r.id)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
.process(...);
由于會話窗口的開始時間和結(jié)束時間取決于接收到的元素,所以窗口分配器無法立即將所有的元素分配到正確的窗口中去。相反,會話窗口分配器最開始時先將每一個元素分配到它自己獨有的窗口中去,窗口開始時間是這個元素的時間戳,窗口大小是session gap的大小。接下來,會話窗口分配器會將出現(xiàn)重疊的窗口合并成一個窗口。
1.3 調(diào)用窗口計算函數(shù)
window functions定義了窗口中數(shù)據(jù)的計算邏輯。有兩種計算邏輯:
增量聚合函數(shù)(Incremental aggregation functions):當(dāng)一個事件被添加到窗口時,觸發(fā)函數(shù)計算,并且更新window的狀態(tài)(單個值)。最終聚合的結(jié)果將作為輸出。ReduceFunction和AggregateFunction是增量聚合函數(shù)。
全窗口函數(shù)(Full window functions):這個函數(shù)將會收集窗口中所有的元素,可以做一些復(fù)雜計算。ProcessWindowFunction是window function。
ReduceFunction
例子: 計算每個傳感器15s窗口中的溫度最小值
scala version
val minTempPerWindow = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
java version
DataStream<Tuple2<String, Double>> minTempPerwindow = sensorData
.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return Tuple2.of(value.id, value.temperature);
}
})
.keyBy(r -> r.f0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) throws Exception {
if (value1.f1 < value2.f1) {
return value1;
} else {
return value2;
}
}
})
AggregateFunction
先來看接口定義
public interface AggregateFunction<IN, ACC, OUT>
extends Function, Serializable {
// create a new accumulator to start a new aggregate
ACC createAccumulator();
// add an input element to the accumulator and return the accumulator
ACC add(IN value, ACC accumulator);
// compute the result from the accumulator and return it.
OUT getResult(ACC accumulator);
// merge two accumulators and return the result.
ACC merge(ACC a, ACC b);
}
IN是輸入元素的類型,ACC是累加器的類型,OUT是輸出元素的類型。
例子
val avgTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.aggregate(new AvgTempFunction)
// An AggregateFunction to compute the average temperature per sensor.
// The accumulator holds the sum of temperatures and an event count.
class AvgTempFunction
extends AggregateFunction[(String, Double),
(String, Double, Int), (String, Double)] {
override def createAccumulator() = {
("", 0.0, 0)
}
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}
override def merge(acc1: (String, Double, Int),
acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}
ProcessWindowFunction
一些業(yè)務(wù)場景,我們需要收集窗口內(nèi)所有的數(shù)據(jù)進行計算,例如計算窗口數(shù)據(jù)的中位數(shù),或者計算窗口數(shù)據(jù)中出現(xiàn)頻率最高的值。這樣的需求,使用ReduceFunction和AggregateFunction就無法實現(xiàn)了。這個時候就需要ProcessWindowFunction了。
先來看接口定義
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
// Evaluates the window
void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out)
throws Exception;
// Deletes any custom per-window state when the window is purged
public void clear(Context ctx) throws Exception {}
// The context holding window metadata
public abstract class Context implements Serializable {
// Returns the metadata of the window
public abstract W window();
// Returns the current processing time
public abstract long currentProcessingTime();
// Returns the current event-time watermark
public abstract long currentWatermark();
// State accessor for per-window state
public abstract KeyedStateStore windowState();
// State accessor for per-key global state
public abstract KeyedStateStore globalState();
// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
process()方法接受的參數(shù)為:
window的key
Iterable迭代器包含窗口的所有元素
Collector用于輸出結(jié)果流。
Context參數(shù)和別的process方法一樣。而ProcessWindowFunction的Context對象還可以訪問window的元數(shù)據(jù)(窗口開始和結(jié)束時間),當(dāng)前處理時間和水位線,per-window state和per-key global state,side outputs。
per-window state: 用于保存一些信息,這些信息可以被process()訪問,只要process所處理的元素屬于這個窗口。
per-key global state: 同一個key,也就是在一條KeyedStream上,不同的window可以訪問per-key global state保存的值。
例子:計算5s滾動窗口中的最低和最高的溫度。輸出的元素包含了(流的Key, 最低溫度, 最高溫度, 窗口結(jié)束時間)。
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new HighAndLowTempProcessFunction)
case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)
class HighAndLowTempProcessFunction
extends ProcessWindowFunction[SensorReading,
MinMaxTemp, String, TimeWindow] {
override def process(key: String,
ctx: Context,
vals: Iterable[SensorReading],
out: Collector[MinMaxTemp]): Unit = {
val temps = vals.map(_.temperature)
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
}
}
我們還可以將ReduceFunction/AggregateFunction和ProcessWindowFunction結(jié)合起來使用。ReduceFunction/AggregateFunction做增量聚合,ProcessWindowFunction提供更多的對數(shù)據(jù)流的訪問權(quán)限。如果只使用ProcessWindowFunction(底層的實現(xiàn)為將事件都保存在ListState中),將會非常占用空間。分配到某個窗口的元素將被提前聚合,而當(dāng)窗口的trigger觸發(fā)時,也就是窗口收集完數(shù)據(jù)關(guān)閉時,將會把聚合結(jié)果發(fā)送到ProcessWindowFunction中,這時Iterable參數(shù)將會只有一個值,就是前面聚合的值。
例子
input
.keyBy(...)
.timeWindow(...)
.reduce(
incrAggregator: ReduceFunction[IN],
function: ProcessWindowFunction[IN, OUT, K, W])
input
.keyBy(...)
.timeWindow(...)
.aggregate(
incrAggregator: AggregateFunction[IN, ACC, V],
windowFunction: ProcessWindowFunction[V, OUT, K, W])
我們把之前的需求重新使用以上兩種方法實現(xiàn)一下。
case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
.map(r => (r.id, r.temperature, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce(
(r1: (String, Double, Double), r2: (String, Double, Double)) => {
(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
},
new AssignWindowEndProcessFunction
)
class AssignWindowEndProcessFunction
extends ProcessWindowFunction[(String, Double, Double),
MinMaxTemp, String, TimeWindow] {
override def process(key: String,
ctx: Context,
minMaxIt: Iterable[(String, Double, Double)],
out: Collector[MinMaxTemp]): Unit = {
val minMax = minMaxIt.head
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
}
}
1.4 自定義窗口操作符
Flink內(nèi)置的window operators分配器已經(jīng)已經(jīng)足夠應(yīng)付大多數(shù)應(yīng)用場景。盡管如此,如果我們需要實現(xiàn)一些復(fù)雜的窗口邏輯,例如:可以發(fā)射早到的事件或者碰到遲到的事件就更新窗口的結(jié)果,或者窗口的開始和結(jié)束決定于特定事件的接收。
DataStream API暴露了接口和方法來自定義窗口操作符。
自定義窗口分配器
自定義窗口計算觸發(fā)器(trigger)
自定義窗口數(shù)據(jù)清理功能(evictor)
當(dāng)一個事件來到窗口操作符,首先將會傳給WindowAssigner來處理。WindowAssigner決定了事件將被分配到哪些窗口。如果窗口不存在,WindowAssigner將會創(chuàng)建一個新的窗口。
如果一個window operator接受了一個增量聚合函數(shù)作為參數(shù),例如ReduceFunction或者AggregateFunction,新到的元素將會立即被聚合,而聚合結(jié)果result將存儲在window中。如果window operator沒有使用增量聚合函數(shù),那么新元素將被添加到ListState中,ListState中保存了所有分配給窗口的元素。
新元素被添加到窗口時,這個新元素同時也被傳給了window的trigger。trigger定義了window何時準備好求值,何時window被清空。trigger可以基于window被分配的元素和注冊的定時器來對窗口的所有元素求值或者在特定事件清空window中所有的元素。
當(dāng)window operator只接收一個增量聚合函數(shù)作為參數(shù)時:
當(dāng)window operator只接收一個全窗口函數(shù)作為參數(shù)時:
當(dāng)window operator接收一個增量聚合函數(shù)和一個全窗口函數(shù)作為參數(shù)時:
evictor是一個可選的組件,可以被注入到ProcessWindowFunction之前或者之后調(diào)用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在沒有增量聚合函數(shù)作為參數(shù)的情況下。
下面的代碼說明了如果使用自定義的trigger和evictor定義一個window operator:
stream
.keyBy(...)
.window(...)
[.trigger(...)]
[.evictor(...)]
.reduce/aggregate/process(...)
注意:每個WindowAssigner都有一個默認的trigger。
窗口生命周期
當(dāng)WindowAssigner分配某個窗口的第一個元素時,這個窗口才會被創(chuàng)建。所以不存在沒有元素的窗口。
一個窗口包含了如下狀態(tài):
Window content
分配到這個窗口的元素 增量聚合的結(jié)果(如果window operator接收了ReduceFunction或者AggregateFunction作為參數(shù))。
Window object
WindowAssigner返回0個,1個或者多個window object。window operator根據(jù)返回的window object來聚合元素。每一個window object包含一個windowEnd時間戳,來區(qū)別于其他窗口。
觸發(fā)器的定時器:一個觸發(fā)器可以注冊定時事件,到了定時的時間可以執(zhí)行相應(yīng)的回調(diào)函數(shù),例如:對窗口進行求值或者清空窗口。
觸發(fā)器中的自定義狀態(tài):觸發(fā)器可以定義和使用自定義的、per-window或者per-key狀態(tài)。這個狀態(tài)完全被觸發(fā)器所控制。而不是被window operator控制。
當(dāng)窗口結(jié)束時間來到,window operator將刪掉這個窗口。窗口結(jié)束時間是由window object的end timestamp所定義的。無論是使用processing time還是event time,窗口結(jié)束時間是什么類型可以調(diào)用WindowAssigner.isEventTime()方法獲得。
窗口分配器(window assigners)
WindowAssigner將會把元素分配到0個,1個或者多個窗口中去。我們看一下WindowAssigner接口:
public abstract class WindowAssigner<T, W extends Window>
implements Serializable {
public abstract Collection<W> assignWindows(
T element,
long timestamp,
WindowAssignerContext context);
public abstract Trigger<T, W> getDefaultTriger(
StreamExecutionEnvironment env);
public abstract TypeSerializer<W> getWindowSerializer(
ExecutionConfig executionConfig);
public abstract boolean isEventTime();
public abstract static class WindowAssignerContext {
public abstract long getCurrentProcessingTime();
}
}
WindowAssigner有兩個泛型參數(shù):
T: 事件的數(shù)據(jù)類型
W: 窗口的類型
下面的代碼創(chuàng)建了一個自定義窗口分配器,是一個30秒的滾動事件時間窗口。
class ThirtySecondsWindows
extends WindowAssigner[Object, TimeWindow] {
val windowSize: Long = 30 * 1000L
override def assignWindows(
o: Object,
ts: Long,
ctx: WindowAssigner.WindowAssignerContext
): java.util.List[TimeWindow] = {
val startTime = ts - (ts % windowSize)
val endTime = startTime + windowSize
Collections.singletonList(new TimeWindow(startTime, endTime))
}
override def getDefaultTrigger(
env: environment.StreamExecutionEnvironment
): Trigger[Object, TimeWindow] = {
EventTimeTrigger.create()
}
override def getWindowSerializer(
executionConfig: ExecutionConfig
): TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer
}
override def isEventTime = true
}
增量聚合示意圖

全窗口聚合示意圖

增量聚合和全窗口聚合結(jié)合使用的示意圖

觸發(fā)器(Triggers)
觸發(fā)器定義了window何時會被求值以及何時發(fā)送求值結(jié)果。觸發(fā)器可以到了特定的時間觸發(fā)也可以碰到特定的事件觸發(fā)。例如:觀察到事件數(shù)量符合一定條件或者觀察到了特定的事件。
默認的觸發(fā)器將會在兩種情況下觸發(fā)
處理時間:機器時間到達處理時間
事件時間:水位線超過了窗口的結(jié)束時間
觸發(fā)器可以訪問流的時間屬性以及定時器,還可以對state狀態(tài)編程。所以觸發(fā)器和process function一樣強大。
例如我們可以實現(xiàn)一個觸發(fā)邏輯:當(dāng)窗口接收到一定數(shù)量的元素時,觸發(fā)器觸發(fā)。再比如當(dāng)窗口接收到一個特定元素時,觸發(fā)器觸發(fā)。還有就是當(dāng)窗口接收到的元素里面包含特定模式(5秒鐘內(nèi)接收到了兩個同樣類型的事件),觸發(fā)器也可以觸發(fā)。在一個事件時間的窗口中,一個自定義的觸發(fā)器可以提前(在水位線沒過窗口結(jié)束時間之前)計算和發(fā)射計算結(jié)果。這是一個常見的低延遲計算策略,盡管計算不完全,但不像默認的那樣需要等待水位線沒過窗口結(jié)束時間。
每次調(diào)用觸發(fā)器都會產(chǎn)生一個TriggerResult來決定窗口接下來發(fā)生什么。TriggerResult可以取以下結(jié)果:
CONTINUE:什么都不做
FIRE:如果window operator有ProcessWindowFunction這個參數(shù),將會調(diào)用這個ProcessWindowFunction。如果窗口僅有增量聚合函數(shù)(ReduceFunction或者AggregateFunction)作為參數(shù),那么當(dāng)前的聚合結(jié)果將會被發(fā)送。窗口的state不變。
PURGE:窗口所有內(nèi)容包括窗口的元數(shù)據(jù)都將被丟棄。
FIRE_AND_PURGE:先對窗口進行求值,再將窗口中的內(nèi)容丟棄。
TriggerResult可能的取值使得我們可以實現(xiàn)很復(fù)雜的窗口邏輯。一個自定義觸發(fā)器可以觸發(fā)多次,可以計算或者更新結(jié)果,可以在發(fā)送結(jié)果之前清空窗口。
接下來我們看一下Trigger API:
public abstract class Trigger<T, W extends Window>
implements Serializable {
TriggerResult onElement(
long timestamp,
W window,
TriggerContext ctx);
public abstract TriggerResult onProcessingTime(
long timestamp,
W window,
TriggerContext ctx);
public abstract TriggerResult onEventTime(
long timestamp,
W window,
TriggerContext ctx);
public boolean canMerge();
public void onMerge(W window, OnMergeContext ctx);
public abstract void clear(W window, TriggerContext ctx);
}
public interface TriggerContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(
StateDescriptor<S, ?> stateDescriptor);
}
public interface OnMergeContext extends TriggerContext {
void mergePartitionedState(
StateDescriptor<S, ?> stateDescriptor
);
}
這里要注意兩個地方:清空state和merging合并觸發(fā)器。
當(dāng)在觸發(fā)器中使用per-window state時,這里我們需要保證當(dāng)窗口被刪除時state也要被刪除,否則隨著時間的推移,window operator將會積累越來越多的數(shù)據(jù),最終可能使應(yīng)用崩潰。
當(dāng)窗口被刪除時,為了清空所有狀態(tài),觸發(fā)器的clear()方法需要需要刪掉所有的自定義per-window state,以及使用TriggerContext對象將處理時間和事件時間的定時器都刪除。
下面的例子展示了一個觸發(fā)器在窗口結(jié)束時間之前觸發(fā)。當(dāng)?shù)谝粋€事件被分配到窗口時,這個觸發(fā)器注冊了一個定時器,定時時間為水位線之前一秒鐘。當(dāng)定時事件執(zhí)行,將會注冊一個新的定時事件,這樣,這個觸發(fā)器每秒鐘最多觸發(fā)一次。
scala version
class OneSecondIntervalTrigger
extends Trigger[SensorReading, TimeWindow] {
override def onElement(
SensorReading r,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext
): TriggerResult = {
val firstSeen: ValueState[Boolean] = ctx
.getPartitionedState(
new ValueStateDescriptor[Boolean](
"firstSeen", classOf[Boolean]
)
)
if (!firstSeen.value()) {
val t = ctx.getCurrentWatermark
+ (1000 - (ctx.getCurrentWatermark % 1000))
ctx.registerEventTimeTimer(t)
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
TriggerResult.CONTINUE
}
override def onEventTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext
): TriggerResult = {
if (timestamp == window.getEnd) {
TriggerResult.FIRE_AND_PURGE
} else {
val t = ctx.getCurrentWatermark
+ (1000 - (ctx.getCurrentWatermark % 1000))
if (t < window.getEnd) {
ctx.registerEventTimeTimer(t)
}
TriggerResult.FIRE
}
}
override def onProcessingTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext
): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(
window: TimeWindow,
ctx: Trigger.TriggerContext
): Unit = {
val firstSeen: ValueState[Boolean] = ctx
.getPartitionedState(
new ValueStateDescriptor[Boolean](
"firstSeen", classOf[Boolean]
)
)
firstSeen.clear()
}
}
java version
public class TriggerExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
)
.keyBy(r -> r.f0)
.timeWindow(Time.seconds(5))
.trigger(new OneSecondIntervalTrigger())
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long count = 0L;
for (Tuple2<String, Long> i : iterable) count += 1;
collector.collect("窗口中有 " + count + " 條元素");
}
})
.print();
env.execute();
}
public static class OneSecondIntervalTrigger extends Trigger<Tuple2<String, Long>, TimeWindow> {
// 來一條調(diào)用一次
@Override
public TriggerResult onElement(Tuple2<String, Long> r, long l, TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
);
if (firstSeen.value() == null) {
// 4999 + (1000 - 4999 % 1000) = 5000
System.out.println("第一條數(shù)據(jù)來的時候 ctx.getCurrentWatermark() 的值是 " + ctx.getCurrentWatermark());
long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
ctx.registerEventTimeTimer(t);
ctx.registerEventTimeTimer(window.getEnd());
firstSeen.update(true);
}
return TriggerResult.CONTINUE;
}
// 定時器邏輯
@Override
public TriggerResult onEventTime(long ts, TimeWindow window, TriggerContext ctx) throws Exception {
if (ts == window.getEnd()) {
return TriggerResult.FIRE_AND_PURGE;
} else {
System.out.println("當(dāng)前水位線是:" + ctx.getCurrentWatermark());
long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
if (t < window.getEnd()) {
ctx.registerEventTimeTimer(t);
}
return TriggerResult.FIRE;
}
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
);
firstSeen.clear();
}
}
}
清理器(EVICTORS)
evictor可以在window function求值之前或者之后移除窗口中的元素。
我們看一下Evictor的接口定義:
public interface Evictor<T, W extends Window>
extends Serializable {
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);
interface EvictorContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
}
}
evictBefore()和evictAfter()分別在window function計算之前或者之后調(diào)用。Iterable迭代器包含了窗口所有的元素,size為窗口中元素的數(shù)量,window object和EvictorContext可以訪問當(dāng)前處理時間和水位線??梢詫terator調(diào)用remove()方法來移除窗口中的元素。
evictor也經(jīng)常被用在GlobalWindow上,用來清除部分元素,而不是將窗口中的元素全部清空。
數(shù)據(jù)流操作
1 基于時間的雙流Join
數(shù)據(jù)流操作的另一個常見需求是對兩條數(shù)據(jù)流中的事件進行聯(lián)結(jié)(connect)或Join。Flink DataStream API中內(nèi)置有兩個可以根據(jù)時間條件對數(shù)據(jù)流進行Join的算子:基于間隔的Join和基于窗口的Join。本節(jié)我們會對它們進行介紹。
如果Flink內(nèi)置的Join算子無法表達所需的Join語義,那么你可以通過CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction實現(xiàn)自定義的Join邏輯。
注意,你要設(shè)計的Join算子需要具備高效的狀態(tài)訪問模式及有效的狀態(tài)清理策略。
1.1 基于間隔的Join
基于間隔的Join會對兩條流中擁有相同鍵值以及彼此之間時間戳不超過某一指定間隔的事件進行Join。
下圖展示了兩條流(A和B)上基于間隔的Join,如果B中事件的時間戳相較于A中事件的時間戳不早于1小時且不晚于15分鐘,則會將兩個事件Join起來。Join間隔具有對稱性,因此上面的條件也可以表示為A中事件的時間戳相較B中事件的時間戳不早于15分鐘且不晚于1小時。

基于間隔的Join目前只支持事件時間以及INNER JOIN語義(無法發(fā)出未匹配成功的事件)。下面的例子定義了一個基于間隔的Join。
input1
.intervalJoin(input2)
.between(<lower-bound>, <upper-bound>) // 相對于input1的上下界
.process(ProcessJoinFunction) // 處理匹配的事件對
Join成功的事件對會發(fā)送給ProcessJoinFunction。下界和上界分別由負時間間隔和正時間間隔來定義,例如between(Time.hour(-1), Time.minute(15))。在滿足下界值小于上界值的前提下,你可以任意對它們賦值。例如,允許出現(xiàn)B中事件的時間戳相較A中事件的時間戳早1~2小時這樣的條件。
基于間隔的Join需要同時對雙流的記錄進行緩沖。對第一個輸入而言,所有時間戳大于當(dāng)前水位線減去間隔上界的數(shù)據(jù)都會被緩沖起來;對第二個輸入而言,所有時間戳大于當(dāng)前水位線加上間隔下界的數(shù)據(jù)都會被緩沖起來。注意,兩側(cè)邊界值都有可能為負。上圖中的Join需要存儲數(shù)據(jù)流A中所有時間戳大于當(dāng)前水位線減去15分鐘的記錄,以及數(shù)據(jù)流B中所有時間戳大于當(dāng)前水位線減去1小時的記錄。不難想象,如果兩條流的事件時間不同步,那么Join所需的存儲就會顯著增加,因為水位線總是由“較慢”的那條流來決定。
例子:每個用戶的點擊Join這個用戶最近10分鐘內(nèi)的瀏覽
scala version
object IntervalJoinExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
/*
A.intervalJoin(B).between(lowerBound, upperBound)
B.intervalJoin(A).between(-upperBound, -lowerBound)
*/
val stream1 = env
.fromElements(
("user_1", 10 * 60 * 1000L, "click"),
("user_1", 16 * 60 * 1000L, "click")
)
.assignAscendingTimestamps(_._2)
.keyBy(r => r._1)
val stream2 = env
.fromElements(
("user_1", 5 * 60 * 1000L, "browse"),
("user_1", 6 * 60 * 1000L, "browse")
)
.assignAscendingTimestamps(_._2)
.keyBy(r => r._1)
stream1
.intervalJoin(stream2)
.between(Time.minutes(-10), Time.minutes(0))
.process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] {
override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = {
collector.collect(in1 + " => " + in2)
}
})
.print()
stream2
.intervalJoin(stream1)
.between(Time.minutes(0), Time.minutes(10))
.process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] {
override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = {
collector.collect(in1 + " => " + in2)
}
})
.print()
env.execute()
}
}
java version
public class IntervalJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
KeyedStream<Tuple3<String, Long, String>, String> stream1 = env
.fromElements(
Tuple3.of("user_1", 10 * 60 * 1000L, "click")
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, String>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Long, String>>() {
@Override
public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long l) {
return stringLongStringTuple3.f1;
}
})
)
.keyBy(r -> r.f0);
KeyedStream<Tuple3<String, Long, String>, String> stream2 = env
.fromElements(
Tuple3.of("user_1", 5 * 60 * 1000L, "browse"),
Tuple3.of("user_1", 6 * 60 * 1000L, "browse")
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple3<String, Long, String>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Long, String>>() {
@Override
public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long l) {
return stringLongStringTuple3.f1;
}
})
)
.keyBy(r -> r.f0);
stream1
.intervalJoin(stream2)
.between(Time.minutes(-10), Time.minutes(0))
.process(new ProcessJoinFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>, String>() {
@Override
public void processElement(Tuple3<String, Long, String> stringLongStringTuple3, Tuple3<String, Long, String> stringLongStringTuple32, Context context, Collector<String> collector) throws Exception {
collector.collect(stringLongStringTuple3 + " => " + stringLongStringTuple32);
}
})
.print();
env.execute();
}
}
1.2 基于窗口的Join
顧名思義,基于窗口的Join需要用到Flink中的窗口機制。其原理是將兩條輸入流中的元素分配到公共窗口中并在窗口完成時進行Join(或Cogroup)。
下面的例子展示了如何定義基于窗口的Join。
input1.join(input2)
.where(...) // 為input1指定鍵值屬性
.equalTo(...) // 為input2指定鍵值屬性
.window(...) // 指定WindowAssigner
[.trigger(...)] // 選擇性的指定Trigger
[.evictor(...)] // 選擇性的指定Evictor
.apply(...) // 指定JoinFunction
下圖展示了DataStream API中基于窗口的Join是如何工作的。

兩條輸入流都會根據(jù)各自的鍵值屬性進行分區(qū),公共窗口分配器會將二者的事件映射到公共窗口內(nèi)(其中同時存儲了兩條流中的數(shù)據(jù))。當(dāng)窗口的計時器觸發(fā)時,算子會遍歷兩個輸入中元素的每個組合(叉乘積)去調(diào)用JoinFunction。同時你也可以自定義觸發(fā)器或移除器。由于兩條流中的事件會被映射到同一個窗口中,因此該過程中的觸發(fā)器和移除器與常規(guī)窗口算子中的完全相同。
除了對窗口中的兩條流進行Join,你還可以對它們進行Cogroup,只需將算子定義開始位置的join改為coGroup()即可。Join和Cogroup的總體邏輯相同,二者的唯一區(qū)別是:Join會為兩側(cè)輸入中的每個事件對調(diào)用JoinFunction;而Cogroup中用到的CoGroupFunction會以兩個輸入的元素遍歷器為參數(shù),只在每個窗口中被調(diào)用一次。
注意,對劃分窗口后的數(shù)據(jù)流進行Join可能會產(chǎn)生意想不到的語義。例如,假設(shè)你為執(zhí)行Join操作的算子配置了1小時的滾動窗口,那么一旦來自兩個輸入的元素沒有被劃分到同一窗口,它們就無法Join在一起,即使二者彼此僅相差1秒鐘。
scala version
object TwoWindowJoinExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream1 = env
.fromElements(
("a", 1000L),
("a", 2000L)
)
.assignAscendingTimestamps(_._2)
val stream2 = env
.fromElements(
("a", 3000L),
("a", 4000L)
)
.assignAscendingTimestamps(_._2)
stream1
.join(stream2)
// on A.id = B.id
.where(_._1)
.equalTo(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction[(String, Long), (String, Long), String] {
override def join(in1: (String, Long), in2: (String, Long)): String = {
in1 + " => " + in2
}
})
.print()
env.execute()
}
}
java version
public class TwoWindowJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Long>> stream1 = env
.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env
.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.join(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Long> stringLongTuple22) throws Exception {
return stringLongTuple2 + " => " + stringLongTuple22;
}
})
.print();
env.execute();
}
}
2 處理遲到的元素
水位線可以用來平衡計算的完整性和延遲兩方面。除非我們選擇一種非常保守的水位線策略(最大延時設(shè)置的非常大,以至于包含了所有的元素,但結(jié)果是非常大的延遲),否則我們總需要處理遲到的元素。
遲到的元素是指當(dāng)這個元素來到時,這個元素所對應(yīng)的窗口已經(jīng)計算完畢了(也就是說水位線已經(jīng)沒過窗口結(jié)束時間了)。這說明遲到這個特性只針對事件時間。
DataStream API提供了三種策略來處理遲到元素
直接拋棄遲到的元素
將遲到的元素發(fā)送到另一條流中去
可以更新窗口已經(jīng)計算完的結(jié)果,并發(fā)出計算結(jié)果。
2.1 拋棄遲到元素
拋棄遲到的元素是event time window operator的默認行為。也就是說一個遲到的元素不會創(chuàng)建一個新的窗口。
process function可以通過比較遲到元素的時間戳和當(dāng)前水位線的大小來很輕易的過濾掉遲到元素。
2.2 重定向遲到元素
遲到的元素也可以使用側(cè)輸出(side output)特性被重定向到另外的一條流中去。遲到元素所組成的側(cè)輸出流可以繼續(xù)處理或者sink到持久化設(shè)施中去。
例子:
scala version
val readings = env
.socketTextStream("localhost", 9999, '\n')
.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong * 1000)
})
.assignAscendingTimestamps(_._2)
val countPer10Secs = readings
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.sideOutputLateData(
new OutputTag[(String, Long)]("late-readings")
)
.process(new CountFunction())
val lateStream = countPer10Secs
.getSideOutput(
new OutputTag[(String, Long)]("late-readings")
)
lateStream.print()
實現(xiàn)CountFunction:
class CountFunction extends ProcessWindowFunction[(String, Long),
String, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Long)],
out: Collector[String]): Unit = {
out.collect("窗口共有" + elements.size + "條數(shù)據(jù)")
}
}
java version
public class RedirectLateEvent {
private static OutputTag<Tuple2<String, Long>> output = new OutputTag<Tuple2<String, Long>>("late-readings"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, Long>> stream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.
// like scala: assignAscendingTimestamps(_._2)
<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> value, long l) {
return value.f1;
}
})
);
SingleOutputStreamOperator<String> lateReadings = stream
.keyBy(r -> r.f0)
.timeWindow(Time.seconds(5))
.sideOutputLateData(output) // use after keyBy and timeWindow
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long exactSizeIfKnown = iterable.spliterator().getExactSizeIfKnown();
collector.collect(exactSizeIfKnown + " of elements");
}
});
lateReadings.print();
lateReadings.getSideOutput(output).print();
env.execute();
}
}
下面這個例子展示了ProcessFunction如何過濾掉遲到的元素然后將遲到的元素發(fā)送到側(cè)輸出流中去。
scala version
val readings: DataStream[SensorReading] = ...
val filteredReadings: DataStream[SensorReading] = readings
.process(new LateReadingsFilter)
// retrieve late readings
val lateReadings: DataStream[SensorReading] = filteredReadings
.getSideOutput(new OutputTag[SensorReading]("late-readings"))
/** A ProcessFunction that filters out late sensor readings and
* re-directs them to a side output */
class LateReadingsFilter
extends ProcessFunction[SensorReading, SensorReading] {
val lateReadingsOut = new OutputTag[SensorReading]("late-readings")
override def processElement(
SensorReading r,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// compare record timestamp with current watermark
if (r.timestamp < ctx.timerService().currentWatermark()) {
// this is a late reading => redirect it to the side output
ctx.output(lateReadingsOut, r)
} else {
out.collect(r)
}
}
}
java version
public class RedirectLateEvent {
private static OutputTag<String> output = new OutputTag<String>("late-readings"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Tuple2<String, Long>> stream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.
<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> value, long l) {
return value.f1;
}
})
)
.process(new ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> stringLongTuple2, Context context, Collector<Tuple2<String, Long>> collector) throws Exception {
if (stringLongTuple2.f1 < context.timerService().currentWatermark()) {
context.output(output, "late event is comming!");
} else {
collector.collect(stringLongTuple2);
}
}
});
stream.print();
stream.getSideOutput(output).print();
env.execute();
}
}
2.3 使用遲到元素更新窗口計算結(jié)果
由于存在遲到的元素,所以已經(jīng)計算出的窗口結(jié)果是不準確和不完全的。我們可以使用遲到元素更新已經(jīng)計算完的窗口結(jié)果。
如果我們要求一個operator支持重新計算和更新已經(jīng)發(fā)出的結(jié)果,就需要在第一次發(fā)出結(jié)果以后也要保存之前所有的狀態(tài)。但顯然我們不能一直保存所有的狀態(tài),肯定會在某一個時間點將狀態(tài)清空,而一旦狀態(tài)被清空,結(jié)果就再也不能重新計算或者更新了。而遲到的元素只能被拋棄或者發(fā)送到側(cè)輸出流。
window operator API提供了方法來明確聲明我們要等待遲到元素。當(dāng)使用event-time window,我們可以指定一個時間段叫做allowed lateness。window operator如果設(shè)置了allowed lateness,這個window operator在水位線沒過窗口結(jié)束時間時也將不會刪除窗口和窗口中的狀態(tài)。窗口會在一段時間內(nèi)(allowed lateness設(shè)置的)保留所有的元素。
當(dāng)遲到元素在allowed lateness時間內(nèi)到達時,這個遲到元素會被實時處理并發(fā)送到觸發(fā)器(trigger)。當(dāng)水位線沒過了窗口結(jié)束時間+allowed lateness時間時,窗口會被刪除,并且所有后來的遲到的元素都會被丟棄。
Allowed lateness可以使用allowedLateness()方法來指定,如下所示:
val readings: DataStream[SensorReading] = ...
val countPer10Secs: DataStream[(String, Long, Int, String)] = readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// process late readings for 5 additional seconds
.allowedLateness(Time.seconds(5))
// count readings and update results if late readings arrive
.process(new UpdatingWindowCountFunction)
/** A counting WindowProcessFunction that distinguishes between
* first results and updates. */
class UpdatingWindowCountFunction
extends ProcessWindowFunction[SensorReading,
(String, Long, Int, String), String, TimeWindow] {
override def process(
id: String,
ctx: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Long, Int, String)]): Unit = {
// count the number of readings
val cnt = elements.count(_ => true)
// state to check if this is
// the first evaluation of the window or not
val isUpdate = ctx.windowState.getState(
new ValueStateDescriptor[Boolean](
"isUpdate",
Types.of[Boolean]))
if (!isUpdate.value()) {
// first evaluation, emit first result
out.collect((id, ctx.window.getEnd, cnt, "first"))
isUpdate.update(true)
} else {
// not the first evaluation, emit an update
out.collect((id, ctx.window.getEnd, cnt, "update"))
}
}
}
java version
public class UpdateWindowResultWithLateEvent {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
stream
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] arr = s.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})
)
.keyBy(r -> r.f0)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(5))
.process(new UpdateWindowResult())
.print();
env.execute();
}
public static class UpdateWindowResult extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
long count = 0L;
for (Tuple2<String, Long> i : iterable) {
count += 1;
}
// 可見范圍比getRuntimeContext.getState更小,只對當(dāng)前key、當(dāng)前window可見
// 基于窗口的狀態(tài)變量,只能當(dāng)前key和當(dāng)前窗口訪問
ValueState<Boolean> isUpdate = context.windowState().getState(
new ValueStateDescriptor<Boolean>("isUpdate", Types.BOOLEAN)
);
// 當(dāng)水位線超過窗口結(jié)束時間時,觸發(fā)窗口的第一次計算!
if (isUpdate.value() == null) {
collector.collect("窗口第一次觸發(fā)計算!一共有 " + count + " 條數(shù)據(jù)!");
isUpdate.update(true);
} else {
collector.collect("窗口更新了!一共有 " + count + " 條數(shù)據(jù)!");
}
}
}
}

八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南
我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
4萬字長文 | ClickHouse基礎(chǔ)&實踐&調(diào)優(yōu)全視角解析
【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。
做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。
專注大數(shù)據(jù)領(lǐng)域?qū)崟r動態(tài)&技術(shù)提升&個人成長&職場進階,歡迎關(guān)注。
