Flink CEP 原理和案例詳解
點擊上方藍色字體,選擇“設(shè)為星標”

1 概念
(1)定義
復合事件處理(Complex Event Processing,CEP)是一種基于動態(tài)環(huán)境中事件流的分析技術(shù),事件在這里通常是有意義的狀態(tài)變化,通過分析事件間的關(guān)系,利用過濾、關(guān)聯(lián)、聚合等技術(shù),根據(jù)事件間的時序關(guān)系和聚合關(guān)系制定檢測規(guī)則,持續(xù)地從事件流中查詢出符合要求的事件序列,最終分析得到更復雜的復合事件。
(2)特征
CEP的特征如下:
??????目標:從有序的簡單事件流中發(fā)現(xiàn)一些高階特征;
??????輸入:一個或多個簡單事件構(gòu)成的事件流;
??????處理:識別簡單事件之間的內(nèi)在聯(lián)系,多個符合一定規(guī)則的簡單事件構(gòu)成復雜事件;
??????輸出:滿足規(guī)則的復雜事件。
(3)功能
CEP用于分析低延遲、頻繁產(chǎn)生的不同來源的事件流。CEP可以幫助在復雜的、不相關(guān)的時間流中找出有意義的模式和復雜的關(guān)系,以接近實時或準實時的獲得通知或組織一些行為。
CEP支持在流上進行模式匹配,根據(jù)模式的條件不同,分為連續(xù)的條件或不連續(xù)的條件;模式的條件允許有時間的限制,當條件范圍內(nèi)沒有達到滿足的條件時,會導致模式匹配超時。
??????看起來很簡單,但是它有很多不同的功能:
??????① 輸入的流數(shù)據(jù),盡快產(chǎn)生結(jié)果;
??????② 在2個事件流上,基于時間進行聚合類的計算;
??????③ 提供實時/準實時的警告和通知;
??????④ 在多樣的數(shù)據(jù)源中產(chǎn)生關(guān)聯(lián)分析模式;
??????⑤ 高吞吐、低延遲的處理
??????市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專門的庫支持。然而,F(xiàn)link提供了專門的CEP庫。
(4)主要組件
????? Flink為CEP提供了專門的Flink CEP library,它包含如下組件:Event Stream、Pattern定義、Pattern檢測和生成Alert。
首先,開發(fā)人員要在DataStream流上定義出模式條件,之后Flink CEP引擎進行模式檢測,必要時生成警告。
2 Pattern API
處理事件的規(guī)則,被叫作模式(Pattern)。
Flink CEP提供了Pattern API用于對輸入流數(shù)據(jù)進行復雜事件規(guī)則定義,用來提取符合規(guī)則的事件序列。
模式大致分為三類:
?① 個體模式(Individual Patterns)
組成復雜規(guī)則的每一個單獨的模式定義,就是個體模式。
start.times(3).where(_.behavior.startsWith(‘fav’))?② 組合模式(Combining Patterns,也叫模式序列)
很多個體模式組合起來,就形成了整個的模式序列。
模式序列必須以一個初始模式開始:
val start = Pattern.begin(‘start’)
③ 模式組(Group of Pattern)
將一個模式序列作為條件嵌套在個體模式里,成為一組模式。
2.1個體模式
個體模式包括單例模式和循環(huán)模式。單例模式只接收一個事件,而循環(huán)模式可以接收多個事件。
(1)量詞
可以在一個個體模式后追加量詞,也就是指定循環(huán)次數(shù)
// 匹配出現(xiàn)4次start.time(4)// 匹配出現(xiàn)0次或4次start.time(4).optional// 匹配出現(xiàn)2、3或4次start.time(2,4)// 匹配出現(xiàn)2、3或4次,并且盡可能多地重復匹配start.time(2,4).greedy// 匹配出現(xiàn)1次或多次start.oneOrMore// 匹配出現(xiàn)0、2或多次,并且盡可能多地重復匹配start.timesOrMore(2).optional.greedy
(2)條件
每個模式都需要指定觸發(fā)條件,作為模式是否接受事件進入的判斷依據(jù)。CEP中的個體模式主要通過調(diào)用.where()、.or()和.until()來指定條件。按不同的調(diào)用方式,可以分成以下幾類:
??????① 簡單條件
通過.where()方法對事件中的字段進行判斷篩選,決定是否接收該事件
start.where(event=>event.getName.startsWith(“foo”))??????② 組合條件
將簡單的條件進行合并;or()方法表示或邏輯相連,where的直接組合就相當于與and。
Pattern.where(event => …/*some condition*/).or(event => /*or condition*/)??????③ 終止條件
如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)。
??????④ 迭代條件
能夠?qū)δJ街八薪邮盏氖录M行處理;調(diào)用.where((value,ctx) => {…}),可以調(diào)用ctx.getEventForPattern(“name”)
2.2 模式序列
??????不同的近鄰模式如下圖:
(1)嚴格近鄰
所有事件按照嚴格的順序出現(xiàn),中間沒有任何不匹配的事件,由.next()指定。例如對于模式“a next b”,事件序列“a,c,b1,b2”沒有匹配。
(2)寬松近鄰
允許中間出現(xiàn)不匹配的事件,由.followedBy()指定。例如對于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。
(3)非確定性寬松近鄰
進一步放寬條件,之前已經(jīng)匹配過的事件也可以再次使用,由.followedByAny()指定。例如對于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為{ab1},{a,b2}。
除了以上模式序列外,還可以定義“不希望出現(xiàn)某種近鄰關(guān)系”:
????? .notNext():不想讓某個事件嚴格緊鄰前一個事件發(fā)生。
????? .notFollowedBy():不想讓某個事件在兩個事件之間發(fā)生。
需要注意:①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結(jié)束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時間約束,用來要求在多長時間內(nèi)匹配有效。
next.within(Time.seconds(10))2.3 模式的檢測
指定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配。調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個PatternStream。
val input:DataStream[Event] = …val pattern:Pattern[Event,_] = …val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
2.4 匹配事件的提取
創(chuàng)建PatternStream之后,就可以應用select或者flatSelect方法,從檢測到的事件序列中提取事件了。
select()方法需要輸入一個select function作為參數(shù),每個成功匹配的事件序列都會調(diào)用它。
select()以一個Map[String,Iterable[IN]]來接收匹配到的事件序列,其中key就是每個模式的名稱,而value就是所有接收到的事件的Iterable類型。
def selectFn(pattern : Map[String,Iterable[IN]]):OUT={val startEvent = pattern.get(“start”).get.nextval endEvent = pattern.get(“end”).get.nextOUT(startEvent, endEvent)}
flatSelect通過實現(xiàn)PatternFlatSelectFunction實現(xiàn)與select相似的功能。唯一的區(qū)別就是flatSelect方法可以返回多條記錄,它通過一個Collector[OUT]類型的參數(shù)來將要輸出的數(shù)據(jù)傳遞到下游。
2.5超時事件的提取
當一個模式通過within關(guān)鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理這些超時的部分匹配,select和flatSelect API調(diào)用允許指定超時處理程序。
3 Flink CEP實戰(zhàn)
為了使用Flink CEP,需要導入pom依賴。
<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-cep-scala_2.11artifactId><version>1.7.0version>dependency>
LoginLog.csv中的數(shù)據(jù)格式為:
5402,83.149.11.115,success,155843081523064,66.249.3.15,fail,15584308265692,80.149.25.29,fail,15584308337233,86.226.15.75,success,15584308325692,80.149.25.29,success,155843084029607,66.249.73.135,success,1558430841
需求:檢測一個用戶在3秒內(nèi)連續(xù)登陸失敗。
// 輸入的登錄事件樣例類case class LoginEvent( userId: Long, ip: String, eventType: String, eventTime: Long )// 輸出的異常報警信息樣例類case class Warning( userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)object LoginFailWithCep {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)// 1. 讀取事件數(shù)據(jù),創(chuàng)建簡單事件流val resource = getClass.getResource("/LoginLog.csv")val loginEventStream = env.readTextFile(resource.getPath).map( data => {val dataArray = data.split(",")LoginEvent( dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong )} ).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L} ).keyBy(_.userId)// 2. 定義匹配模式val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail").next("next").where(_.eventType == "fail").within(Time.seconds(3))// 3. 在事件流上應用模式,得到一個pattern streamval patternStream = CEP.pattern(loginEventStream, loginFailPattern)// 4. 從pattern stream上應用select function,檢出匹配事件序列val loginFailDataStream = patternStream.select( new LoginFailMatch() )loginFailDataStream.print()env.execute("login fail with cep job")}}class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {// 從map中按照名稱取出對應的事件// val iter = map.get("begin").iterator()val firstFail = map.get("begin").iterator().next()val lastFail = map.get("next").iterator().next()Warning( firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!" )}}
4 總結(jié)
本章主要圍繞scala語言來講解Flink CEP庫。其實,F(xiàn)link CEP也有SQL的實現(xiàn)。
文章不錯?點個【在看】吧!??



