Flink CEP的基石:NFA-b自動(dòng)機(jī)原理簡(jiǎn)介
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

前言
Flink的復(fù)雜事件處理(complex event processing, CEP)庫(kù)能夠在無(wú)界數(shù)據(jù)流中通過(guò)匹配定義好的事件模式來(lái)發(fā)現(xiàn)一系列事件之間的關(guān)聯(lián)規(guī)律,從而有效支持趨勢(shì)分析、風(fēng)險(xiǎn)監(jiān)控、欺詐檢測(cè)等業(yè)務(wù)場(chǎng)景。它提供了一套簡(jiǎn)單易用、表達(dá)性強(qiáng)的API,例如,在10秒的時(shí)間窗口內(nèi)檢測(cè)事件的報(bào)警級(jí)別:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val partitionedInput = sourceStream.keyBy(event => event.getId)
// start[] -> middle[name = 'error'] -> .. -> end[name = 'critical'] within 10 secs
val pattern = Pattern.begin[Event]("start")
.next("middle").where(_.getName == "error")
.followedBy("end").where(_.getName == "critical")
.within(Time.seconds(10))
val patternStream = CEP.pattern(partitionedInput, pattern)
val alerts = patternStream.select(createAlert(_))那么,F(xiàn)link CEP是采用什么方法匹配事件規(guī)則的呢?在源碼注釋中,可以得知它是基于《Efficient Pattern Matching over Event Streams》這篇論文的思想實(shí)現(xiàn)的。該論文提出了一種在事件流上進(jìn)行高效模式匹配的方法,即帶匹配緩存的非確定有限狀態(tài)機(jī),又稱(chēng)為NFAb自動(dòng)機(jī)。本文先介紹NFAb自動(dòng)機(jī)的相關(guān)原理,在之后的文章中,再結(jié)合源碼講解Flink CEP庫(kù)的具體實(shí)現(xiàn)。
NFAb自動(dòng)機(jī)的定義與構(gòu)造
在大學(xué)《形式語(yǔ)言與自動(dòng)機(jī)》課程中,我們都學(xué)習(xí)過(guò)非確定有限狀態(tài)機(jī)(NFA),用一句話概括就是:對(duì)于每個(gè)<狀態(tài),輸入符號(hào)>二元組,其狀態(tài)轉(zhuǎn)移可以有多個(gè),而不是確定的一個(gè)。NFAb自動(dòng)機(jī)的形式化定義與普通NFA略有不同,為五元組:

其中:
Q為狀態(tài)集合;
E為表示狀態(tài)轉(zhuǎn)移的有向邊集合;
θ為表示狀態(tài)轉(zhuǎn)移的公式集合,與E共同作用;
q1表示起始狀態(tài);
F表示結(jié)束狀態(tài)。
下面通過(guò)實(shí)例來(lái)構(gòu)造NFAb自動(dòng)機(jī)。先通過(guò)SASE+語(yǔ)言(一種專(zhuān)門(mén)用來(lái)描述CEP pattern的通用語(yǔ)言)定義如下的股票趨勢(shì)匹配模式:
PATTERN SEQ(Stock+ a[], Stock b)
WHERE skip_till_next_match(a[], b) {
[symbol] // 表示只考慮相同的事件類(lèi)型,此處恒為真
AND a[1].volume > 1000
AND a[i].price > avg(a[..i-1].price)
AND b.volume < 80% * a[a.LEN].volume
} WITHIN 1 hour // 時(shí)間窗口該模式以1小時(shí)作為時(shí)間窗口的長(zhǎng)度,以“交易量大于1000”作為匹配序列的起始,且要求序列中股票的最近價(jià)格必須高于之前所有交易價(jià)格的均值。當(dāng)檢測(cè)到該股票的交易量下跌到最近一次交易量的80%以下時(shí),匹配成功結(jié)束。
根據(jù)上面的條件,構(gòu)造出NFAb自動(dòng)機(jī),如下圖所示。這也是Flink CEP中NFACompiler組件需要做的事情。

注意∧符號(hào)表示與,∨符號(hào)表示或,┐符號(hào)表示非
匹配序列a[]的生成實(shí)際上就是構(gòu)造符合謂詞約束的事件的正閉包Stock+ a[](克林閉包去掉ε)。也就是說(shuō),a[1]是上述自動(dòng)機(jī)的起始狀態(tài)(交易量大于1000),a[i]是正在構(gòu)造正閉包的狀態(tài)(最近價(jià)格高于之前所有交易價(jià)格的均值)。而b是從閉包中跳出并匹配下一事件的狀態(tài)(交易量下跌到最近一次交易量的80%以下)。
NFAb自動(dòng)機(jī)的每個(gè)狀態(tài)都有各自的匹配緩存,用于在運(yùn)行時(shí)存儲(chǔ)當(dāng)前的匹配結(jié)果。關(guān)于匹配緩存的細(xì)節(jié),后文會(huì)講到。
狀態(tài)轉(zhuǎn)移語(yǔ)義
復(fù)雜事件的匹配過(guò)程本質(zhì)上就是輸入事件流驅(qū)動(dòng)NFAb自動(dòng)機(jī)進(jìn)行狀態(tài)轉(zhuǎn)移的過(guò)程。根據(jù)θ集合定義的條件,在有向邊集合E上可以定義4種狀態(tài)轉(zhuǎn)移語(yǔ)義。
begin:消費(fèi)輸入事件,存入緩存,并轉(zhuǎn)移到下一個(gè)狀態(tài);
take:消費(fèi)輸入事件,存入緩存,并保持當(dāng)前狀態(tài);
ignore:忽略輸入事件,不存入緩存,并保持當(dāng)前狀態(tài);
proceed:感知輸入事件,轉(zhuǎn)移到下一個(gè)狀態(tài),同時(shí)保留該事件給下一個(gè)狀態(tài)處理。
結(jié)合這4種狀態(tài)轉(zhuǎn)移語(yǔ)義,我們就可以讀懂上圖中的轉(zhuǎn)移公式了。Flink CEP的StateTransitionAction定義中沒(méi)有begin語(yǔ)義,僅有take、ignore和proceed語(yǔ)義,但是它和NFAb自動(dòng)機(jī)是等價(jià)的,之后分析源碼時(shí)將會(huì)看到。
事件選擇策略
所謂事件選擇策略,就是指選擇符合條件的事件進(jìn)入正閉包——即擴(kuò)展匹配序列的方法。在時(shí)間窗口的限制之內(nèi),常用的有以下三種策略。
strict(嚴(yán)格連續(xù)):嚴(yán)格按順序選擇所有符合條件的事件,途中不能出現(xiàn)不符合條件的事件,對(duì)應(yīng)Flink CEP API中的
Pattern.next()/notNext();skip till next match(寬松連續(xù)):按順序選擇所有符合條件的事件,而途中不符合條件的事件被忽略,對(duì)應(yīng)Flink CEP API中的
Pattern.followedBy()/notFollowedBy()。上述SASE+語(yǔ)言描述的pattern使用的就是這個(gè)策略;skip till any match(可變寬松連續(xù)):在skip till next match的基礎(chǔ)上,還允許忽略一些符合條件的事件,以盡量延長(zhǎng)匹配序列的長(zhǎng)度,對(duì)應(yīng)Flink CEP API中的
Pattern.followedByAny()。
以skip till next match策略為例,給出如下的示例數(shù)據(jù),可以產(chǎn)生3個(gè)匹配序列R1、R2、R3,如圖所示。

共享版本匹配緩存
仍然考慮上一節(jié)的圖,回顧一下a[i]狀態(tài)的take和proceed轉(zhuǎn)移邏輯:

可見(jiàn),在e6到達(dá)NFA時(shí),可以同時(shí)滿足a[i]_take和a[i]_proceed的轉(zhuǎn)移(這里正好體現(xiàn)出了NFA的不確定性),所以原本的一個(gè)序列會(huì)在此分裂成兩個(gè):其中一個(gè)(R1)終止匹配,另一個(gè)(R3)繼續(xù)匹配。同理,當(dāng)e3到達(dá)NFA時(shí),同時(shí)滿足a[1]_begin和a[i]_take的轉(zhuǎn)移,所以又會(huì)出現(xiàn)一個(gè)序列R2。
由上可知,這些序列之間的重合是比較大的,如果都按原樣存儲(chǔ)在匹配緩存中,會(huì)造成比較大的膨脹。為了避免這個(gè)問(wèn)題,論文中設(shè)計(jì)了一種科學(xué)的緩存結(jié)構(gòu),稱(chēng)為shared versioned match buffer,即“共享版本匹配緩存”,如下圖所示。

其中圖a、b、c是原始的R1、R2、R3緩存,圖d則是整合在一起的共享版本緩存。它會(huì)將所有序列的前向指針附加上一個(gè)版本號(hào)(采用杜威十進(jìn)制法,點(diǎn)號(hào)分隔),并且遵循以下兩個(gè)規(guī)則:
遷移到下一個(gè)狀態(tài)時(shí),版本號(hào)增加一位,如a[1]狀態(tài)的版本號(hào)是1(為了符合習(xí)慣寫(xiě)作1.0),a[i]狀態(tài)的版本號(hào)是1.0、1.1,b狀態(tài)的版本號(hào)是1.0.0、1.1.0……以此類(lèi)推;
當(dāng)序列發(fā)生分裂時(shí),處于當(dāng)前狀態(tài)的版本號(hào)位加1。例如e3事件產(chǎn)生了2.0版本,e6事件產(chǎn)生了1.1版本。
依照這種規(guī)則,就可以根據(jù)前向指針上版本號(hào)的遞增規(guī)律和前綴來(lái)回溯出正確的序列了。Flink CEP中將此緩存設(shè)計(jì)為SharedBuffer類(lèi),但是版本的設(shè)計(jì)有些不同,之后再提。
計(jì)算狀態(tài)
對(duì)于每一個(gè)序列,NFAb自動(dòng)機(jī)還需要維護(hù)一些最基礎(chǔ)的狀態(tài)數(shù)據(jù),以方便執(zhí)行狀態(tài)轉(zhuǎn)移和匹配邏輯,論文中將其稱(chēng)為computation state,即計(jì)算狀態(tài)?;A(chǔ)的計(jì)算狀態(tài)結(jié)構(gòu)如下圖所示,包含以下數(shù)據(jù)項(xiàng):
當(dāng)前的版本號(hào);
當(dāng)前的狀態(tài);
指向匹配緩存中最近一個(gè)事件的指針;
整個(gè)序列的起始時(shí)間;
其他必要的上下文數(shù)據(jù)存儲(chǔ)。以股票趨勢(shì)數(shù)據(jù)為例,會(huì)維護(hù)正閉包內(nèi)的事件數(shù)、價(jià)格之和以及交易量等。

Flink CEP框架用ComputationState類(lèi)來(lái)維護(hù)計(jì)算狀態(tài),大體思路與論文相同。

版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??




