<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark 特性 | Apache Spark 3.1 Structured Streaming 改進(jìn)

          共 4344字,需瀏覽 9分鐘

           ·

          2021-05-19 01:37

          Apache Spark 3.1.x 版本發(fā)布到現(xiàn)在已經(jīng)過了兩個多月了,這個版本繼續(xù)保持使得 Spark 更快,更容易和更智能的目標(biāo),Spark 3.1 的主要目標(biāo)如下:

          ?提升了 Python 的可用性;?加強(qiáng)了 ANSI SQL 兼容性;?加強(qiáng)了查詢優(yōu)化;?Shuffle hash join 性能提升;?History Server 支持 structured streaming 
          更多詳情請參見這里。在這篇博文中,我們總結(jié)了3.1版本中 Spark Streaming 的顯著改進(jìn),包括新的流式表(streaming table)API、支持 stream-stream join 和多個 UI 增強(qiáng)。此外,模式驗證(schema validation)和對 Apache Kafka 數(shù)據(jù)源的改進(jìn)提供了更好的可用性。此外,F(xiàn)ileStream source/sink 也進(jìn)行了各種增強(qiáng),以提高讀/寫性能。


          新的流式表 API

          啟動 structured stream 時,連續(xù)數(shù)據(jù)流被認(rèn)為是無界表(unbounded table)。因此,Table APIs 提供了一種更自然、更方便的方法來處理流查詢。在 Spark 3.1 中,社區(qū)添加了對 DataStreamReader 和 DataStreamWriter 的支持。我們現(xiàn)在可以直接以表的形式使用這個 API 讀取和寫入流式 DataFrames。請參見下面的示例:

          # Create a streaming DataFramesrc = spark.readStream.format("rate").option("rowPerSecond", 10).load()# Write the streaming DataFrame to a tablesrc.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")# Check the table resultspark.read.table("myTable").show(truncate=30)+-----------------------+-----+|timestamp              |value|+-----------------------+-----+|2021-01-19 07:45:23.122|42   ||2021-01-19 07:45:23.222|43   ||2021-01-19 07:45:23.322|44   |...

          此外,通過這些新功能,用戶可以轉(zhuǎn)換源數(shù)據(jù)集并寫入到一張新表:

          # Write to a new table with transformationspark.readStream.table("myTable").select("value") \  .writeStream.option("checkpointLocation", checkpointLoc2) \  .format("parquet").toTable("newTable")# Check the table resultspark.read.table("newTable").show()+-----+|value|+-----+| 1214|| 1215|| 1216|...

          Databricks 推薦在 streaming table APIs 中使用 Delta Lake 格式,因為這種格式將帶來以下好處:

          ?并發(fā)壓縮由低延遲場景產(chǎn)生的小文件;?多個流作業(yè)(或并發(fā)批處理作業(yè))支持“僅且一次”(exactly-once)處理;?當(dāng)使用文件作為流的源時,可以有效地發(fā)現(xiàn)哪些文件是新的。

          stream-stream 支持更多 Join 類型

          在 Spark 3.1 之前,stream-stream join 只支持 inner、left outer 以及 right outer joins。在最新的版本中,社區(qū)實(shí)現(xiàn)了完整的 full outer 以及 left semi stream-stream join,使 Structured Streaming 支持更多的場景。

          ?Left semi stream-stream join (SPARK-32862)?Full outer stream-stream join (SPARK-32863)

          Kafka 數(shù)據(jù)源性能提升

          在 Spark 3.1 中,社區(qū)已經(jīng)將 Kafka 依賴升級到 2.6.0 (SPARK-32568),這使得用戶可以遷移到 Kafka offsets retrieval 新的 API(AdminClient.listOffsets)。它解決了使用舊版本時 Kafka 連接器無限等待的問題 (SPARK-28367)。


          模式校驗

          模式是 Structured Streaming 查詢的基本信息。在 Spark 3.1 中,社區(qū)為用戶輸入的模式和內(nèi)部存儲的模式添加了模式驗證邏輯:

          在查詢重啟中引入狀態(tài)模式驗證(SPARK-27237)

          通過此更新,鍵和值的模式將存儲在 stream 啟動時的模式文件(schema files)中。然后,在重新啟動查詢時,根據(jù)現(xiàn)有的鍵和值模式驗證新的鍵和值模式的兼容性。當(dāng)字段的數(shù)量相同且每個字段的數(shù)據(jù)類型相同時,狀態(tài)模式被認(rèn)為是“兼容的”。注意,這里不會檢查字段名,因為 Spark 允許重命名。

          這將阻止使用不兼容狀態(tài)模式的查詢運(yùn)行,從而減少不確定性行為的概率,并提供在錯誤的時候更多的信息。

          為流狀態(tài)存儲引入模式驗證(SPARK-31894)

          以前,Structured Streaming 直接將檢查點(diǎn)(用UnsafeRow表示)放到 StateStore 中,而不需要任何模式驗證。當(dāng)升級到新的 Spark 版本時,檢查點(diǎn)文件將被重用。如果沒有模式驗證,任何與聚合函數(shù)相關(guān)的更改或 bug 修復(fù)都可能導(dǎo)致隨機(jī)異常,甚至產(chǎn)生錯誤的結(jié)果(參見 SPARK-28067)。現(xiàn)在 Spark 將檢驗檢查點(diǎn)里面的模式,并在遷移過程中重用檢查點(diǎn)時拋出 InvalidUnsafeRowException。


          Structured Streaming UI 方面的加強(qiáng)

          社區(qū)在 Spark 3.0 中引入了新的 Structured Streaming UI。在 Spark 3.1 中,社區(qū)在 Structured Streaming UI 中添加了對歷史記錄服務(wù)器的支持(Structured Streaming UI(),以及更多關(guān)于 streaming 運(yùn)行時狀態(tài)的信息,具體如下:

          Structured Streaming UI 中的狀態(tài)信息 (SPARK-33223)

          狀態(tài)信息中添加了四個度量信息:

          ?Aggregated Number Of Total State Rows?Aggregated Number Of Updated State Rows?Aggregated State Memory Used In Bytes?Aggregated Number Of State Rows Dropped By Watermark

          有了這些指標(biāo),我們就可以了解狀態(tài)存儲的整體情況。而且根據(jù)這些信息我們還可以評估是否需要擴(kuò)容。

          Structured Streaming UI 中 Watermark gap 信息 (SPARK-33224)

          Watermark 是狀態(tài)查詢中用戶需要跟蹤的主要指標(biāo)之一。它定義了附加模式(append mode)的輸出“何時”發(fā)出,因此知道 wall clock 和水印(輸入數(shù)據(jù))之間的差距對于設(shè)置輸出期望非常有幫助。

          Structured Streaming UI 中自定義指標(biāo)信息(SPARK-33287)

          下面顯示了在配置 spark.sql.streaming.ui.enabledCustomMetricList 中設(shè)置的自定義度量信息:


          FileStreamSource/Sink 方面的加強(qiáng)

          FileStreamSource/Sink 主要有以下幾個方面的加強(qiáng)。

          Cache fetched list of files beyond maxFilesPerTrigger as unread files (SPARK-30866)

          以前,當(dāng)設(shè)置了 maxFilesPerTrigger 配置時,F(xiàn)ileStreamSource 將獲取所有可用的文件,根據(jù)配置處理有限數(shù)量的文件,并在每個微批處理時忽略其他文件。通過這個改進(jìn),它將緩存以前批次中獲取的文件,并在接下來的批次中處理它們。

          Streamline the logic on file stream source and sink metadata log (SPARK-30462)

          在此更改之前,每當(dāng)需要 FileStreamSource/Sink 中的元數(shù)據(jù)時,元數(shù)據(jù)日志中的所有信息都被反序列化到 Spark 驅(qū)動程序的內(nèi)存中。通過這個更改,Spark 將盡可能以流式(streamlined)的方式讀取和處理元數(shù)據(jù)日志。

          Provide a new option to have retention on output files (SPARK-27188)

          FileStreamSink 中有一個新選項用于配置元數(shù)據(jù)日志文件的保留,這有助于限制長時間運(yùn)行的 Structured Streaming 查詢的元數(shù)據(jù)日志文件大小的增長。


          未來的計劃

          在下一個主要版本中,社區(qū)將繼續(xù)關(guān)注 Spark Structured Streaming 的新功能、性能和可用性改進(jìn)。如果大家在使用過程中有任何方面的問題可以直接到社區(qū)反饋。

          本文翻譯自《What’s New in Apache Spark? 3.1 Release for Structured Streaming》https://databricks.com/blog/2021/04/27/whats-new-in-apache-spark-3-1-release-for-structured-streaming.html

          瀏覽 89
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无码人妻精品一区二区蜜桃网站文 | 青榴视频在线观看 | 五月婷婷五月天 | 九色视频在线观看 | 免费观看a∨视频 |