Spark 特性 | Spark Structured Streaming 2021年最新進展總結(jié)
本文我們將花點時間來回顧一下 Databricks 和 Apache Spark? 在流數(shù)據(jù)處理方面所取得的巨大進步!2021年,工程團隊和開源貢獻(xiàn)者在以下三個目標(biāo)取得了一些進展:
?降低延遲并改進有狀態(tài)流處理;?提高 Databricks 和 Spark Structured Streaming 工作負(fù)載的可觀測性;?改進資源分配和可伸縮性。
下面我們來簡單地看下這些目標(biāo)。
目標(biāo)一:降低延遲并改進有狀態(tài)流處理
有兩個新的關(guān)鍵特性專門用于降低有狀態(tài)操作的延遲,以及對有狀態(tài) APIs 的改進。第一種是針對大型有狀態(tài)操作的異步檢查點(asynchronous checkpointing),它改進了傳統(tǒng)的同步和延遲更高的設(shè)計。
異步檢查點

如果想及時了解Spark、Hadoop或者HBase相關(guān)的文章,歡迎關(guān)注微信公眾號:過往記憶大數(shù)據(jù)?
在這個模型中,狀態(tài)更新會在下一個微批開始之前寫到云存儲檢查點位置。這樣做的好處是,如果有狀態(tài)流查詢失敗,我們可以使用最后一個成功完成的批處理中的信息輕松地重新啟動查詢。在異步模型中,下一個微批不必等待狀態(tài)更新,從而提高了整個微批執(zhí)行的端到端延遲。
如果想及時了解Spark、Hadoop或者HBase相關(guān)的文章,歡迎關(guān)注微信公眾號:過往記憶大數(shù)據(jù)
任意狀態(tài)操作符的改進
在這篇文章中,社區(qū)用 [flat]MapGroupsWithState 在結(jié)構(gòu)化流中引入了任意狀態(tài)處理。這些操作符提供了很大的靈活性,并支持聚合之外的更高級的有狀態(tài)操作。在過去的一年,社區(qū)已經(jīng)對這些操作符進行了改進:
允許初始狀態(tài),避免重新處理所有流數(shù)據(jù);通過公開一個新的 TestGroupState 接口,允許用戶創(chuàng)建 GroupState 的實例,并訪問已設(shè)置的內(nèi)部值,簡化狀態(tài)轉(zhuǎn)換函數(shù)的單元測試,從而實現(xiàn)更簡單的邏輯測試。
允許初始狀態(tài)
讓我們從下面的 flatMapGroupswithState 操作符開始:
def flatMapGroupsWithState[S: Encoder, U: Encoder](outputMode: OutputMode,timeoutConf: GroupStateTimeout,initialState: KeyValueGroupedDataset[K, S])(func: (K, Iterator[V], GroupState[S]) => Iterator[U])
下面的例子計算每種水果的總數(shù):
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {val count = state.getOption.map(_.count).getOrElse(0L) + valList.sizestate.update(new RunningCount(count))Iterator((key, count.toString))}
現(xiàn)在,我們針對某些水果的計數(shù)設(shè)置初始值:
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(("apple", new RunningCount(1)),("orange", new RunningCount(2)),("mango", new RunningCount(5)),).toDS()val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)fruitStream.groupByKey(x => x).flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
簡單的邏輯測試
您現(xiàn)在還可以使用 TestGroupState API 測試狀態(tài)更新。
import org.apache.spark.sql.streaming._import org.apache.spark.api.java.Optionaltest("flatMapGroupsWithState's state update function") {var prevState = TestGroupState.create[UserStatus](optionalState = Optional.empty[UserStatus],timeoutConf = GroupStateTimeout.EventTimeTimeout,batchProcessingTimeMs = 1L,eventTimeWatermarkMs = Optional.of(1L),hasTimedOut = false)val userId: String = ...val actions: Iterator[UserAction] = ...assert(!prevState.hasUpdated)updateState(userId, actions, prevState)assert(prevState.hasUpdated)}
關(guān)于 TestGroupState 的更多例子可以參見這里。
內(nèi)置支持會話窗口(Session Windows)
Structured Streaming 引入了在基于事件時間的窗口上使用滾動或滑動窗口進行聚合的能力,這兩種窗口都是固定長度的窗口。在 Spark 3.2 中,社區(qū)引入了會話窗口的概念,它允許動態(tài)窗口長度。這需要使用 flatMapGroupsWithState 自定義狀態(tài)操作符。我們可以看下下面的例子:
# Define the session window having dynamic gap duration based on eventTypesession_window expr = session_window(events.timestamp, \when(events.eventType == "type1", "5 seconds") \.when(events.eventType == "type2", "20 seconds") \.otherwise("5 minutes"))# Group the data by session window and userId, and compute the count of each groupwindowedCountsDF = events \.withWatermark("timestamp", "10 minutes") \.groupBy(events.userID, session_window_expr) \.count()
關(guān)于會話窗口的介紹可以參見 Apache Spark 3.2 內(nèi)置支持會話窗口。
目標(biāo)二:提高 Databricks 和 Spark Structured Streaming 工作負(fù)載的可觀測性
雖然 StreamingQueryListener API 允許我們在 SparkSession 中異步監(jiān)視查詢,并為查詢狀態(tài)、進度和終止事件定義自定義回調(diào)函數(shù),但理解反壓(understanding back pressure)和推斷瓶頸在微批處理中的位置仍然是一個挑戰(zhàn)。從 Databricks Runtime 8.1 開始,StreamingQueryProgress 對象報告了 Kafka、Kinesis、Delta Lake 和 Auto Loader 流數(shù)據(jù)源的特定數(shù)據(jù)源背壓指標(biāo)(back pressure metrics)。
比如下面是 Kafka 數(shù)據(jù)源的 metrics 例子:
{"sources" : [ {"description" : "KafkaV2[Subscribe[topic]]","metrics" : {"avgOffsetsBehindLatest" : "4.0","maxOffsetsBehindLatest" : "4","minOffsetsBehindLatest" : "4","estimatedTotalBytesBehindLatest" : "80.0"},} ]}
關(guān)于上面新增指標(biāo)可以參見 SPARK-34854。注意,社區(qū)版好像沒有 estimatedTotalBytesBehindLatest 指標(biāo)。
另外,Databricks Runtime 8.3 引入了實時指標(biāo),以幫助理解 RocksDB 狀態(tài)存儲的性能,調(diào)試狀態(tài)操作的性能;這些還可以幫助識別異步檢查點的目標(biāo)工作負(fù)載。一個新的狀態(tài)存儲監(jiān)控的例子如下:
{"id" : "6774075e-8869-454b-ad51-513be86cfd43","runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5","batchId" : 7,"stateOperators" : [ {"numRowsTotal" : 20000000,"numRowsUpdated" : 20000000,"memoryUsedBytes" : 31005397,"numRowsDroppedByWatermark" : 0,"customMetrics" : {"rocksdbBytesCopied" : 141037747,"rocksdbCommitCheckpointLatency" : 2,"rocksdbCommitCompactLatency" : 22061,"rocksdbCommitFileSyncLatencyMs" : 1710,"rocksdbCommitFlushLatency" : 19032,"rocksdbCommitPauseLatency" : 0,"rocksdbCommitWriteBatchLatency" : 56155,"rocksdbFilesCopied" : 2,"rocksdbFilesReused" : 0,"rocksdbGetCount" : 40000000,"rocksdbGetLatency" : 21834,"rocksdbPutCount" : 1,"rocksdbPutLatency" : 56155599000,"rocksdbReadBlockCacheHitCount" : 1988,"rocksdbReadBlockCacheMissCount" : 40341617,"rocksdbSstFileSize" : 141037747,"rocksdbTotalBytesReadByCompaction" : 336853375,"rocksdbTotalBytesReadByGet" : 680000000,"rocksdbTotalBytesReadThroughIterator" : 0,"rocksdbTotalBytesWrittenByCompaction" : 141037747,"rocksdbTotalBytesWrittenByPut" : 740000012,"rocksdbTotalCompactionLatencyMs" : 21949695000,"rocksdbWriterStallLatencyMs" : 0,"rocksdbZipFileBytesUncompressed" : 7038}} ],"sources" : [ {} ],"sink" : {}}
關(guān)于這個功能可以參見 SPARK-36236。
目標(biāo)三:改進資源分配和可伸縮性
Streaming Autoscaling Delta Live Tables (DLT)
在去年的 Data + AI Summit 峰會上,數(shù)磚發(fā)布了 Delta Live Tables,這是一個框架,允許我們聲明式地構(gòu)建和編排數(shù)據(jù)管道,并在很大程度上抽象了配置集群和節(jié)點類型的需求。在過去的一邊這個功能得以提升,并為流管道引入了一個智能自動伸縮解決方案,該解決方案改進了現(xiàn)有的 Databricks 優(yōu)化自動伸縮(Databricks Optimized Autoscaling)。這些好處包括:
?更好地利用集群:新的算法利用新的背壓度量(back pressure metrics)來調(diào)整集群大小,以更好地處理流工作負(fù)載波動的場景,這最終導(dǎo)致更好的集群利用率。?主動優(yōu)雅的 Worker 關(guān)閉:現(xiàn)有的自動伸縮解決方案只有在節(jié)點空閑時才會關(guān)閉節(jié)點,而新的 DLT Autoscaler 會在利用率低時主動關(guān)閉選定的節(jié)點,同時保證不會因為關(guān)閉而導(dǎo)致任務(wù)失敗。
Trigger.AvailableNow
在 Structured Streaming 中,觸發(fā)器允許用戶定義流查詢數(shù)據(jù)處理的時間。這些觸發(fā)器類型可以是 micro-batch (默認(rèn))、fixed interval micro-batch (Trigger.ProcessingTime)、one-time micro-batch (Trigger.Once)和 continuous (Trigger.Continuous)。
Databricks Runtime 10.1 (對應(yīng)社區(qū)的 Spark 3.3.0 版本,參見 SPARK-36533)引入了一種新的觸發(fā)器:Trigger.AvailableNow,類似于 Trigger.Once ,但提供了更好的可伸縮性。與 Trigger Once 一樣,所有可用的數(shù)據(jù)都將在查詢停止之前處理,但是是以多批處理的方法來處理而不是一次處理所有的數(shù)據(jù),這有可能會導(dǎo)致 Driver 出現(xiàn) OOM。這個功能支持 Delta Lake 、Auto Loader 以及 Kafka(SPARK-36649)流數(shù)據(jù)源。下面是一個使用例子:
spark.readStream.format("delta").option("maxFilesPerTrigger", "1").load(inputDir).writeStream.trigger(Trigger.AvailableNow).option("checkpointLocation", checkpointDir).start()
本文翻譯自《Structured Streaming: A Year in Review》https://databricks.com/blog/2022/02/07/structured-streaming-a-year-in-review.html。
