Spark 特性 | Apache Spark 3.1 Structured Streaming 改進(jìn)
?提升了 Python 的可用性;?加強(qiáng)了 ANSI SQL 兼容性;?加強(qiáng)了查詢優(yōu)化;?Shuffle hash join 性能提升;?History Server 支持 structured streaming
更多詳情請參見這里。在這篇博文中,我們總結(jié)了3.1版本中 Spark Streaming 的顯著改進(jìn),包括新的流式表(streaming table)API、支持 stream-stream join 和多個 UI 增強(qiáng)。此外,模式驗證(schema validation)和對 Apache Kafka 數(shù)據(jù)源的改進(jìn)提供了更好的可用性。此外,F(xiàn)ileStream source/sink 也進(jìn)行了各種增強(qiáng),以提高讀/寫性能。
新的流式表 API
啟動 structured stream 時,連續(xù)數(shù)據(jù)流被認(rèn)為是無界表(unbounded table)。因此,Table APIs 提供了一種更自然、更方便的方法來處理流查詢。在 Spark 3.1 中,社區(qū)添加了對 DataStreamReader 和 DataStreamWriter 的支持。我們現(xiàn)在可以直接以表的形式使用這個 API 讀取和寫入流式 DataFrames。請參見下面的示例:
# Create a streaming DataFramesrc = spark.readStream.format("rate").option("rowPerSecond", 10).load()# Write the streaming DataFrame to a tablesrc.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")# Check the table resultspark.read.table("myTable").show(truncate=30)+-----------------------+-----+|timestamp |value|+-----------------------+-----+|2021-01-19 07:45:23.122|42 ||2021-01-19 07:45:23.222|43 ||2021-01-19 07:45:23.322|44 |...
此外,通過這些新功能,用戶可以轉(zhuǎn)換源數(shù)據(jù)集并寫入到一張新表:
# Write to a new table with transformationspark.readStream.table("myTable").select("value") \.writeStream.option("checkpointLocation", checkpointLoc2) \.format("parquet").toTable("newTable")# Check the table resultspark.read.table("newTable").show()+-----+|value|+-----+| 1214|| 1215|| 1216|...
Databricks 推薦在 streaming table APIs 中使用 Delta Lake 格式,因為這種格式將帶來以下好處:
?并發(fā)壓縮由低延遲場景產(chǎn)生的小文件;?多個流作業(yè)(或并發(fā)批處理作業(yè))支持“僅且一次”(exactly-once)處理;?當(dāng)使用文件作為流的源時,可以有效地發(fā)現(xiàn)哪些文件是新的。
stream-stream 支持更多 Join 類型
在 Spark 3.1 之前,stream-stream join 只支持 inner、left outer 以及 right outer joins。在最新的版本中,社區(qū)實(shí)現(xiàn)了完整的 full outer 以及 left semi stream-stream join,使 Structured Streaming 支持更多的場景。
?Left semi stream-stream join (SPARK-32862)?Full outer stream-stream join (SPARK-32863)
Kafka 數(shù)據(jù)源性能提升
在 Spark 3.1 中,社區(qū)已經(jīng)將 Kafka 依賴升級到 2.6.0 (SPARK-32568),這使得用戶可以遷移到 Kafka offsets retrieval 新的 API(AdminClient.listOffsets)。它解決了使用舊版本時 Kafka 連接器無限等待的問題 (SPARK-28367)。
模式校驗
模式是 Structured Streaming 查詢的基本信息。在 Spark 3.1 中,社區(qū)為用戶輸入的模式和內(nèi)部存儲的模式添加了模式驗證邏輯:
在查詢重啟中引入狀態(tài)模式驗證(SPARK-27237)
通過此更新,鍵和值的模式將存儲在 stream 啟動時的模式文件(schema files)中。然后,在重新啟動查詢時,根據(jù)現(xiàn)有的鍵和值模式驗證新的鍵和值模式的兼容性。當(dāng)字段的數(shù)量相同且每個字段的數(shù)據(jù)類型相同時,狀態(tài)模式被認(rèn)為是“兼容的”。注意,這里不會檢查字段名,因為 Spark 允許重命名。
這將阻止使用不兼容狀態(tài)模式的查詢運(yùn)行,從而減少不確定性行為的概率,并提供在錯誤的時候更多的信息。
為流狀態(tài)存儲引入模式驗證(SPARK-31894)
以前,Structured Streaming 直接將檢查點(diǎn)(用UnsafeRow表示)放到 StateStore 中,而不需要任何模式驗證。當(dāng)升級到新的 Spark 版本時,檢查點(diǎn)文件將被重用。如果沒有模式驗證,任何與聚合函數(shù)相關(guān)的更改或 bug 修復(fù)都可能導(dǎo)致隨機(jī)異常,甚至產(chǎn)生錯誤的結(jié)果(參見 SPARK-28067)。現(xiàn)在 Spark 將檢驗檢查點(diǎn)里面的模式,并在遷移過程中重用檢查點(diǎn)時拋出 InvalidUnsafeRowException。
Structured Streaming UI 方面的加強(qiáng)
社區(qū)在 Spark 3.0 中引入了新的 Structured Streaming UI。在 Spark 3.1 中,社區(qū)在 Structured Streaming UI 中添加了對歷史記錄服務(wù)器的支持(Structured Streaming UI(),以及更多關(guān)于 streaming 運(yùn)行時狀態(tài)的信息,具體如下:
Structured Streaming UI 中的狀態(tài)信息 (SPARK-33223)
狀態(tài)信息中添加了四個度量信息:
?Aggregated Number Of Total State Rows?Aggregated Number Of Updated State Rows?Aggregated State Memory Used In Bytes?Aggregated Number Of State Rows Dropped By Watermark
有了這些指標(biāo),我們就可以了解狀態(tài)存儲的整體情況。而且根據(jù)這些信息我們還可以評估是否需要擴(kuò)容。

Structured Streaming UI 中 Watermark gap 信息 (SPARK-33224)
Watermark 是狀態(tài)查詢中用戶需要跟蹤的主要指標(biāo)之一。它定義了附加模式(append mode)的輸出“何時”發(fā)出,因此知道 wall clock 和水印(輸入數(shù)據(jù))之間的差距對于設(shè)置輸出期望非常有幫助。

Structured Streaming UI 中自定義指標(biāo)信息(SPARK-33287)
下面顯示了在配置 spark.sql.streaming.ui.enabledCustomMetricList 中設(shè)置的自定義度量信息:

FileStreamSource/Sink 方面的加強(qiáng)
FileStreamSource/Sink 主要有以下幾個方面的加強(qiáng)。
Cache fetched list of files beyond maxFilesPerTrigger as unread files (SPARK-30866)
以前,當(dāng)設(shè)置了 maxFilesPerTrigger 配置時,F(xiàn)ileStreamSource 將獲取所有可用的文件,根據(jù)配置處理有限數(shù)量的文件,并在每個微批處理時忽略其他文件。通過這個改進(jìn),它將緩存以前批次中獲取的文件,并在接下來的批次中處理它們。
Streamline the logic on file stream source and sink metadata log (SPARK-30462)
在此更改之前,每當(dāng)需要 FileStreamSource/Sink 中的元數(shù)據(jù)時,元數(shù)據(jù)日志中的所有信息都被反序列化到 Spark 驅(qū)動程序的內(nèi)存中。通過這個更改,Spark 將盡可能以流式(streamlined)的方式讀取和處理元數(shù)據(jù)日志。
Provide a new option to have retention on output files (SPARK-27188)
FileStreamSink 中有一個新選項用于配置元數(shù)據(jù)日志文件的保留,這有助于限制長時間運(yùn)行的 Structured Streaming 查詢的元數(shù)據(jù)日志文件大小的增長。
未來的計劃
在下一個主要版本中,社區(qū)將繼續(xù)關(guān)注 Spark Structured Streaming 的新功能、性能和可用性改進(jìn)。如果大家在使用過程中有任何方面的問題可以直接到社區(qū)反饋。
本文翻譯自《What’s New in Apache Spark? 3.1 Release for Structured Streaming》https://databricks.com/blog/2021/04/27/whats-new-in-apache-spark-3-1-release-for-structured-streaming.html
