<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink CEP 原理和案例詳解

          共 5871字,需瀏覽 12分鐘

           ·

          2020-08-18 09:00

          點擊上方藍色字體,選擇“設(shè)為星標

          回復”資源“獲取更多資源

          大數(shù)據(jù)技術(shù)與架構(gòu)
          點擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強公眾號!

          暴走大數(shù)據(jù)
          點擊右側(cè)關(guān)注,暴走大數(shù)據(jù)!

          1 概念

          (1)定義
          復合事件處理(Complex Event Processing,CEP)是一種基于動態(tài)環(huán)境中事件流的分析技術(shù),事件在這里通常是有意義的狀態(tài)變化,通過分析事件間的關(guān)系,利用過濾、關(guān)聯(lián)、聚合等技術(shù),根據(jù)事件間的時序關(guān)系和聚合關(guān)系制定檢測規(guī)則,持續(xù)地從事件流中查詢出符合要求的事件序列,最終分析得到更復雜的復合事件。
          (2)特征
          CEP的特征如下:
          ??????目標:從有序的簡單事件流中發(fā)現(xiàn)一些高階特征;
          ??????輸入:一個或多個簡單事件構(gòu)成的事件流;
          ??????處理:識別簡單事件之間的內(nèi)在聯(lián)系,多個符合一定規(guī)則的簡單事件構(gòu)成復雜事件;
          ??????輸出:滿足規(guī)則的復雜事件。

          (3)功能
          CEP用于分析低延遲、頻繁產(chǎn)生的不同來源的事件流。CEP可以幫助在復雜的、不相關(guān)的時間流中找出有意義的模式和復雜的關(guān)系,以接近實時或準實時的獲得通知或組織一些行為。
          CEP支持在流上進行模式匹配,根據(jù)模式的條件不同,分為連續(xù)的條件或不連續(xù)的條件;模式的條件允許有時間的限制,當條件范圍內(nèi)沒有達到滿足的條件時,會導致模式匹配超時。
          ??????看起來很簡單,但是它有很多不同的功能:
          ???
          ???① 輸入的流數(shù)據(jù),盡快產(chǎn)生結(jié)果;
          ??????② 在2個事件流上,基于時間進行聚合類的計算;
          ??????③ 提供實時/準實時的警告和通知;
          ??????④ 在多樣的數(shù)據(jù)源中產(chǎn)生關(guān)聯(lián)分析模式;
          ??????⑤ 高吞吐、低延遲的處理
          ??????市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專門的庫支持。然而,F(xiàn)link提供了專門的CEP庫。
          (4)主要組件
          ????? Flink為CEP提供了專門的Flink CEP library,它包含如下組件:Event Stream、Pattern定義、Pattern檢測和生成Alert。
          首先,開發(fā)人員要在DataStream流上定義出模式條件,之后Flink CEP引擎進行模式檢測,必要時生成警告。

          2 Pattern API

          處理事件的規(guī)則,被叫作模式(Pattern)。
          Flink CEP提供了Pattern API用于對輸入流數(shù)據(jù)進行復雜事件規(guī)則定義,用來提取符合規(guī)則的事件序列。
          模式大致分為三類:
          ?① 個體模式(Individual Patterns)
          組成復雜規(guī)則的每一個單獨的模式定義,就是個體模式。

          start.times(3).where(_.behavior.startsWith(‘fav’))

          ?② 組合模式(Combining Patterns,也叫模式序列)
          很多個體模式組合起來,就形成了整個的模式序列。
          模式序列必須以一個初始模式開始:

          val start = Pattern.begin(‘start’)

          ③ 模式組(Group of Pattern)

          將一個模式序列作為條件嵌套在個體模式里,成為一組模式。

          2.1個體模式

          個體模式包括單例模式和循環(huán)模式。單例模式只接收一個事件,而循環(huán)模式可以接收多個事件。
          (1)量詞
          可以在一個個體模式后追加量詞,也就是指定循環(huán)次數(shù)

          // 匹配出現(xiàn)4次start.time(4)// 匹配出現(xiàn)0次或4次start.time(4).optional// 匹配出現(xiàn)2、3或4次start.time(2,4)// 匹配出現(xiàn)2、3或4次,并且盡可能多地重復匹配start.time(2,4).greedy// 匹配出現(xiàn)1次或多次start.oneOrMore// 匹配出現(xiàn)0、2或多次,并且盡可能多地重復匹配start.timesOrMore(2).optional.greedy

          (2)條件

          每個模式都需要指定觸發(fā)條件,作為模式是否接受事件進入的判斷依據(jù)。CEP中的個體模式主要通過調(diào)用.where()、.or()和.until()來指定條件。按不同的調(diào)用方式,可以分成以下幾類:
          ??????① 簡單條件
          通過.where()方法對事件中的字段進行判斷篩選,決定是否接收該事件

          start.where(event=>event.getName.startsWith(“foo”))

          ??????② 組合條件
          將簡單的條件進行合并;or()方法表示或邏輯相連,where的直接組合就相當于與and。

          Pattern.where(event =>/*some condition*/).or(event => /*or condition*/)

          ??????③ 終止條件
          如果使用了oneOrMore或者oneOrMore.optional,建議使用.until()作為終止條件,以便清理狀態(tài)。
          ??????④ 迭代條件
          能夠?qū)δJ街八薪邮盏氖录M行處理;調(diào)用.where((value,ctx) => {…}),可以調(diào)用ctx.getEventForPattern(“name”)

          2.2 模式序列

          ??????不同的近鄰模式如下圖:

          (1)嚴格近鄰
          所有事件按照嚴格的順序出現(xiàn),中間沒有任何不匹配的事件,由.next()指定。例如對于模式“a next b”,事件序列“a,c,b1,b2”沒有匹配。
          (2)寬松近鄰
          允許中間出現(xiàn)不匹配的事件,由.followedBy()指定。例如對于模式“a followedBy b”,事件序列“a,c,b1,b2”匹配為{a,b1}。
          (3)非確定性寬松近鄰
          進一步放寬條件,之前已經(jīng)匹配過的事件也可以再次使用,由.followedByAny()指定。例如對于模式“a followedByAny b”,事件序列“a,c,b1,b2”匹配為{ab1},{a,b2}。
          除了以上模式序列外,還可以定義“不希望出現(xiàn)某種近鄰關(guān)系”:
          ????? .notNext():不想讓某個事件嚴格緊鄰前一個事件發(fā)生。
          ????? .notFollowedBy():不想讓某個事件在兩個事件之間發(fā)生。
          需要注意:①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結(jié)束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時間約束,用來要求在多長時間內(nèi)匹配有效。

          next.within(Time.seconds(10))

          2.3 模式的檢測

          指定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配。調(diào)用CEP.pattern(),給定輸入流和模式,就能得到一個PatternStream。

          val input:DataStream[Event] = …val pattern:Pattern[Event,_] = …val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)


          2.4 匹配事件的提取

          創(chuàng)建PatternStream之后,就可以應用select或者flatSelect方法,從檢測到的事件序列中提取事件了。
          select()方法需要輸入一個select function作為參數(shù),每個成功匹配的事件序列都會調(diào)用它。
          select()以一個Map[String,Iterable[IN]]來接收匹配到的事件序列,其中key就是每個模式的名稱,而value就是所有接收到的事件的Iterable類型。

          def selectFn(pattern : Map[String,Iterable[IN]]):OUT={  val startEvent = pattern.get(“start”).get.next  val endEvent = pattern.get(“end”).get.next  OUT(startEvent, endEvent)}


          flatSelect通過實現(xiàn)PatternFlatSelectFunction實現(xiàn)與select相似的功能。唯一的區(qū)別就是flatSelect方法可以返回多條記錄,它通過一個Collector[OUT]類型的參數(shù)來將要輸出的數(shù)據(jù)傳遞到下游。

          2.5超時事件的提取

          當一個模式通過within關(guān)鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度而被丟棄;為了能夠處理這些超時的部分匹配,select和flatSelect API調(diào)用允許指定超時處理程序。

          3 Flink CEP實戰(zhàn)

          為了使用Flink CEP,需要導入pom依賴。

          <dependency><groupId>org.apache.flinkgroupId><artifactId>flink-cep-scala_2.11artifactId><version>1.7.0version>dependency>

          LoginLog.csv中的數(shù)據(jù)格式為:

          5402,83.149.11.115,success,155843081523064,66.249.3.15,fail,15584308265692,80.149.25.29,fail,15584308337233,86.226.15.75,success,15584308325692,80.149.25.29,success,155843084029607,66.249.73.135,success,1558430841

          需求:檢測一個用戶在3秒內(nèi)連續(xù)登陸失敗。

          // 輸入的登錄事件樣例類case class LoginEvent( userId: Long, ip: String, eventType: String, eventTime: Long )// 輸出的異常報警信息樣例類case class Warning( userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)
          object LoginFailWithCep { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1)
          // 1. 讀取事件數(shù)據(jù),創(chuàng)建簡單事件流 val resource = getClass.getResource("/LoginLog.csv") val loginEventStream = env.readTextFile(resource.getPath) .map( data => { val dataArray = data.split(",") LoginEvent( dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong ) } ) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) { override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L } ) .keyBy(_.userId)
          // 2. 定義匹配模式 val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail") .next("next").where(_.eventType == "fail") .within(Time.seconds(3))
          // 3. 在事件流上應用模式,得到一個pattern stream val patternStream = CEP.pattern(loginEventStream, loginFailPattern)
          // 4. 從pattern stream上應用select function,檢出匹配事件序列 val loginFailDataStream = patternStream.select( new LoginFailMatch() )
          loginFailDataStream.print()
          env.execute("login fail with cep job") }}
          class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{ override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = { // 從map中按照名稱取出對應的事件 // val iter = map.get("begin").iterator() val firstFail = map.get("begin").iterator().next() val lastFail = map.get("next").iterator().next() Warning( firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail!" ) }}


          4 總結(jié)

          本章主要圍繞scala語言來講解Flink CEP庫。其實,F(xiàn)link CEP也有SQL的實現(xiàn)。

          歡迎點贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



          文章不錯?點個【在看】吧!??

          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕日产av 中文字幕在线资源 | 伊人色色大香蕉 | 青青综合影视 | 亚洲性爱视频在线观看 | 亚洲人成电影一区二区在线 |