<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark 原理 | Apache Spark 3.2 內(nèi)置支持會(huì)話窗口

          共 3779字,需瀏覽 8分鐘

           ·

          2021-10-24 20:49

          Apache Spark? Structured Streaming 允許用戶在事件時(shí)間的窗口上進(jìn)行聚合。在 Apache Spark 3.2? 之前,Spark 支持滾動(dòng)窗口(tumbling windows)和滑動(dòng)窗口( sliding windows)。在已經(jīng)發(fā)布的 Apache Spark 3.2 中,社區(qū)添加了“會(huì)話窗口(session windows)”作為新支持的窗口類型,它適用于流查詢和批處理查詢

          什么是會(huì)話窗口

          如果想及時(shí)了解Spark、Hadoop或者HBase相關(guān)的文章,歡迎關(guān)注微信公眾號(hào):過(guò)往記憶大數(shù)據(jù) 翻滾窗口是一系列固定大小、不重疊且連續(xù)的時(shí)間間隔。一個(gè)輸入只能綁定到一個(gè)窗口。

          從“固定大小”的角度來(lái)看,滑動(dòng)窗口類似于滾動(dòng)窗口,但是如果滑動(dòng)的持續(xù)時(shí)間小于窗口的持續(xù)時(shí)間,則窗口可以重疊,在這種情況下,可以將輸入綁定到多個(gè)窗口。

          與前兩種類型相比,會(huì)話窗口具有不同的特征。會(huì)話窗口有一個(gè)動(dòng)態(tài)大小的窗口長(zhǎng)度,具體取決于輸入。會(huì)話窗口以輸入開(kāi)始,如果在間隔持續(xù)時(shí)間內(nèi)收到以下輸入,則會(huì)話窗口將自身展開(kāi)。如果在接收到最新輸入后的間隔時(shí)間內(nèi)沒(méi)有收到任何輸入,會(huì)話窗口就會(huì)關(guān)閉。這使您可以對(duì)事件進(jìn)行分組,直到在指定的持續(xù)時(shí)間(不活動(dòng))內(nèi)沒(méi)有新事件。

          它的工作原理類似于具有會(huì)話超時(shí)的網(wǎng)站上的會(huì)話 - 如果您登錄網(wǎng)站并且在一段時(shí)間內(nèi)沒(méi)有顯示任何活動(dòng),則該網(wǎng)站將提示您保留登錄狀態(tài)并強(qiáng)制退出,如果您在這之后仍然不活動(dòng),那么會(huì)話就會(huì)關(guān)閉。但如果我們保持活動(dòng),會(huì)話超時(shí)就會(huì)延長(zhǎng)。

          將此應(yīng)用于會(huì)話窗口:當(dāng)新事件(例如流式作業(yè))發(fā)生時(shí),將啟動(dòng)新的會(huì)話窗口,并且在超時(shí)內(nèi)的后續(xù)事件將包含在同一會(huì)話窗口中。每個(gè)事件都會(huì)延長(zhǎng)會(huì)話超時(shí)時(shí)間,與其他時(shí)間窗口相比,它引入了不同的特性——會(huì)話窗口的持續(xù)時(shí)間不是靜態(tài)的,而翻滾和滑動(dòng)窗口的持續(xù)時(shí)間都是靜態(tài)的。

          如何在查詢中使用會(huì)話窗口

          在 Spark 3.2 版本之前,我們需要使用 flatMapGroupsWithState 來(lái)處理會(huì)話窗口。您需要設(shè)計(jì)自己的邏輯來(lái)定義會(huì)話窗口以及如何在同一會(huì)話中聚合輸入。這帶來(lái)了幾個(gè)缺點(diǎn):

          ?我們不能利用內(nèi)置的聚合函數(shù),如 count, sum 等,并必須自己實(shí)現(xiàn)它們。?考慮到各種輸出模式和輸入的延遲,實(shí)現(xiàn)邏輯并非易事。?PySpark 中沒(méi)有 flatMapGroupsWithState,因此,我們需要通過(guò) Java/Scala 精心設(shè)計(jì)查詢。

          現(xiàn)在,Spark 提供了與使用時(shí)間窗口(time windows)相同的用戶體驗(yàn)。“在 Structured Streaming 中,在事件時(shí)間上表達(dá)這樣的窗口只是執(zhí)行一個(gè)特殊的分組”這句話仍然是正確的。對(duì)于滾動(dòng)和滑動(dòng)窗口,請(qǐng)使用 window 函數(shù);對(duì)于會(huì)話窗口,引入了一個(gè)新的函數(shù) session_window。

          例如,在事件中的 eventTime 列上進(jìn)行 5 分鐘的滾動(dòng)(非重疊)窗口可以描述如下。

          # tumbling windowwindowedCountsDF = \  eventsDF \    .withWatermark("eventTime", "10 minutes") \    .groupBy(“deviceId”, window("eventTime", "10 minutes") \    .count()# sliding windowwindowedCountsDF = \  eventsDF \    .withWatermark("eventTime", "10 minutes") \    .groupBy(“deviceId”, window("eventTime", "10 minutes", "5 minutes")) \    .count()

          可以將 window 替換成 session_window 實(shí)現(xiàn)會(huì)話窗口:

          # session windowwindowedCountsDF = \  eventsDF \    .withWatermark("eventTime", "10 minutes") \    .groupBy("deviceId", session_window("eventTime", "5 minutes")) \    .count()

          具有動(dòng)態(tài)間隙持續(xù)時(shí)間的會(huì)話窗口

          會(huì)話窗口除了具有相同的跨會(huì)話間隔持續(xù)時(shí)間之外,還有另一種類型的會(huì)話窗口,每個(gè)會(huì)話具有不同的間隔持續(xù)時(shí)間。我們稱之為“動(dòng)態(tài)間隙持續(xù)時(shí)間(dynamic gap duration)”。

          如果想及時(shí)了解Spark、Hadoop或者HBase相關(guān)的文章,歡迎關(guān)注微信公眾號(hào):過(guò)往記憶大數(shù)據(jù) 時(shí)間線下方的框表示每個(gè)事件及其間隔持續(xù)時(shí)間。一共有有四個(gè)事件,它們的(事件時(shí)間,間隔持續(xù)時(shí)間)對(duì)分別是(12:04,4 分鐘)藍(lán)色,(12:06,9 分鐘)橙色,(12:09,5 分鐘)黃色,以及( 12:15,5 分鐘)綠色。

          時(shí)間線上方的框表示由這些事件構(gòu)成的實(shí)際會(huì)話。您可以將每個(gè)事件視為一個(gè)單獨(dú)的會(huì)話,并將具有交集的會(huì)話合并為一個(gè)。正如您看到的,會(huì)話的時(shí)間范圍是會(huì)話中包含的所有事件的時(shí)間范圍的“聯(lián)合”。請(qǐng)注意,會(huì)話的結(jié)束時(shí)間不再是會(huì)話中最新事件的時(shí)間 + 間隙持續(xù)時(shí)間。

          新的 session_window 函數(shù)接收兩個(gè)參數(shù),事件時(shí)間列和間隙持續(xù)時(shí)間。

          對(duì)于動(dòng)態(tài)會(huì)話窗口,您可以在“session_window”函數(shù)中為“間隙持續(xù)時(shí)間”參數(shù)提供一個(gè)“表達(dá)式”。這個(gè)表達(dá)式應(yīng)該解析為一個(gè)間隔,比如“5分鐘”。由于“間隙持續(xù)時(shí)間”參數(shù)接收一個(gè)表達(dá)式,我們也可以使用 UDF。

          例如,基于 eventType 列的動(dòng)態(tài)間隙持續(xù)時(shí)間的會(huì)話窗口計(jì)數(shù)可以實(shí)現(xiàn)如下:

          # Define the session window having dynamic gap duration based on eventTypesession_window expr = session_window(events.timestamp, \    when(events.eventType == "type1", "5 seconds") \    .when(events.eventType == "type2", "20 seconds") \    .otherwise("5 minutes"))# Group the data by session window and userId, and compute the count of each groupwindowedCountsDF = events \    .withWatermark("timestamp", "10 minutes") \    .groupBy(events.userID, session_window_expr) \    .count()

          原生支持會(huì)話窗口與 FlatMapGroupsWithState 比較

          flatMapGroupsWithState 為實(shí)現(xiàn)會(huì)話窗口提供了更大的靈活性,但它需要用戶編寫(xiě)一堆代碼。例如,請(qǐng)參考 Apache Spark 上的 sessionization 示例,它通過(guò) flatMapGroupsWithState 實(shí)現(xiàn)會(huì)話窗口。請(qǐng)注意,Apache Spark 上的會(huì)話示例非常簡(jiǎn)化,僅適用于處理時(shí)間和附加模式。通過(guò)原生支持會(huì)話窗口,處理事件時(shí)間和各種輸出模式的總體復(fù)雜性被抽象了出來(lái)。

          Spark 將原生支持會(huì)話窗口設(shè)置為覆蓋一般用例的目標(biāo),因?yàn)樗?Spark 能夠優(yōu)化性能和狀態(tài)存儲(chǔ)。當(dāng)我們的業(yè)務(wù)用例需要一個(gè)復(fù)雜的會(huì)話窗口時(shí),我們可能仍然需要利用 flatMapGroupsWithState,例如,如果用例會(huì)話也應(yīng)該在特定類型的事件上關(guān)閉,而不管是否處于不活動(dòng)狀態(tài)。

          總結(jié)

          我們已經(jīng)介紹了流聚合中如何使用會(huì)話窗口,它也適用于批處理查詢。通過(guò)學(xué)習(xí)如何使用新的 session_window 函數(shù)',我們可以利用流數(shù)據(jù)聚合與時(shí)間窗口的知識(shí),并能夠處理會(huì)話窗口。這也使 SQL/PySpark 用戶能夠處理會(huì)話窗口,因?yàn)?flatMapGroupsWithState API 在 PySpark 中不可用并且不能表示為 SQL 語(yǔ)句。

          當(dāng)然,時(shí)間窗口的操作還有很多空間可以改進(jìn),在此之前還是需要使用 flatMapGroupsWithState API,社區(qū)計(jì)劃在不久的將來(lái)研究自定義窗口操作。

          本文翻譯自《Native Support of Session Window in Spark Structured Streaming》:https://databricks.com/blog/2021/10/12/native-support-of-session-window-in-spark-structured-streaming.html

          瀏覽 53
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天操夜夜爱 | 天天操天天爽天天撸一鲁‘ | 一区二区三区在线 | 欧 | 伊人久久大香线蕉综合色狠狠 | 日本日逼黄色 |