<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          當Spark遇上Zeppelin?

          共 4840字,需瀏覽 10分鐘

           ·

          2021-10-24 17:18

          隨著數(shù)據(jù)時代和 AI 時代的到來,幾乎所有的企業(yè)都在建設(shè)自己的大數(shù)據(jù)系統(tǒng)。為了提高數(shù)據(jù)處理能力,突破單機在計算與資源上的瓶頸,諸如 Hadoop、Spark、Flink 等分布式計算框架和基于 HDFS 的分布式存儲系統(tǒng)成為大多數(shù)選擇。實際工作中,大部分時間我們都在研發(fā)、部署和維護批處理、流處理程序,完成相應(yīng)的業(yè)務(wù)需求,但是相信很多人都遇到過這樣的事情:

          • 需要對一批數(shù)據(jù)做探索性分析,所謂探索性即尚且沒有明確的思路,需要不斷的嘗試,這時你無法形成完整方案寫到代碼文件、打包、正式部署。
          • 臨時有個任務(wù)需要驗證一下,特別針對研發(fā)人員,你為這個任務(wù)寫個正式代碼文件、打包、部署顯然過于繁瑣,并且很多線上環(huán)境是不允許隨便傳代碼的。


          這里的問題就是,如何在分布式計算框架之上實現(xiàn)交互式運行代碼?Notebook 顯然成為首選。Notebook 是一類基于 Web 的交互式數(shù)據(jù)分析工具,比較流行的有 Jupyter、Zeppelin 等。Jupyter 是基于 Python 的,前身是 IPython,在單機數(shù)據(jù)分析上表現(xiàn)非常優(yōu)異,特別是結(jié)合 pandas 庫。而 Zeppelin 則以插件的形式對大多數(shù)分布式計算引擎提供了友好的支持,尤其是 Spark。


          本文重點探討Spark with Zeppelin,即在 Zeppelin 中通過 Spark 計算引擎進行交互式數(shù)據(jù)分析。文章假定讀者對 Spark 已經(jīng)有一定的認知,主要圍繞 Zeppelin 展開,由淺入深包含以下幾方面:


          • 介紹 Zeppelin 的基本概念

          • 介紹如何在 Zeppelin 中通過 Spark 進行數(shù)據(jù)分析

          • 探討如何為 Zeppelin 下的 Spark 配置資源

          • 探討 Zeppelin 與 Spark 進行交互的原理


          需要說明的是,筆者使用的測試環(huán)境為 AWS EMR 5.20.0(Spark 2.4.0 + Zeppelin 0.8.0)。


          基本概念

          Zeppelin 的核心概念可歸為兩類:一類與執(zhí)行引擎相關(guān),一類與 Web UI 相關(guān)。


          Zeppelin 本身并不提供任何數(shù)據(jù)處理功能,而是充分利用第三方執(zhí)行引擎。為了能夠靈活的接入這些執(zhí)行引擎,Zeppelin 引入了 Interpreter 的概念。一個 Interpreter 對應(yīng)一種執(zhí)行引擎,用戶可以根據(jù)需要來使用相應(yīng)的功能。以 Spark 為例,其有 Spark(Scala)、pySpark、SparkSQL、SparkR 多種執(zhí)行方式,每一種都有一個與之對應(yīng)的 Zeppelin Interpreter,而這些 Interpreter 就構(gòu)成了 Spark Interpreter Group。


          在 Zeppelin 的 Web UI 上,可以根據(jù)需要創(chuàng)建自己的 Note,一個 Note 即一個新的文檔,用于編輯與執(zhí)行代碼、可視化結(jié)果等。在 Note 中,為了便于管理,通常會將代碼分段,每個片段只做一個功能,稱為 Paragraph。在 Paragraph 中,需要首先指定該片段所用的 Interpreter,然后根據(jù)該 Interpreter 對應(yīng)的編程語言來編寫代碼。比如在下右圖中,指定了 pyspark 的 Interpreter,就需要用 Python 來編寫代碼。


          使用方法


          本節(jié)以一個示例來介紹如何在 Zeppelin 中使用 Spark SQL 進行數(shù)據(jù)分析,使用的數(shù)據(jù)來源于 2007 年美國航班延誤信息,文件為 CSV 格式,約 745 萬條數(shù)據(jù)。如需練習,請先下載數(shù)據(jù)并上傳到 HDFS/S3 中。該示例主要包括以下幾部分內(nèi)容:


          • 第一,加載數(shù)據(jù)到 Spark 中。使用 pyspark (Interpreter)將 CSV 數(shù)據(jù)文件加載到 Spark 中,允許其根據(jù) Header 自動推導 Schema,生成一個 DataFrame。需要說明的是,第一次運行時,會啟動 Spark,從而花費一定的時間。

          • 第二,創(chuàng)建 Table。根據(jù) DataFrame 創(chuàng)建臨時表,為后面使用 Spark SQL 進行分析做準備。

          • 第三,使用 Spark SQL 進行數(shù)據(jù)分析與可視化。通過sql (Interpreter) 可直接使用 SQL 語言進行數(shù)據(jù)分析,默認是將數(shù)據(jù)結(jié)果以表格的形式打印出來,可根據(jù)需要選擇相應(yīng)的圖形進行可視化。

          • 第四,編寫自己的 UDF。所謂 UDF,就是自定義的函數(shù),可將一個或多個列的值作為輸入,完成特定的計算功能。可根據(jù)需要定義相應(yīng)的 UDF 函數(shù),并注冊到 Spark 中,之后即可在 SQL 中使用。

          • 第五,將分析結(jié)果保存下來。通過 sql 分析的結(jié)果會直接返回到 UI 上,如果希望將其保存下來,則需要通過 pyspark 將結(jié)果輸出到 DataFrame 中,從而調(diào)用相應(yīng)的保存函數(shù),將數(shù)據(jù)保存到相應(yīng)格式的文件中,比如 Parquet。




          資源配置


          使用 Spark 作為計算引擎,自然要根據(jù)數(shù)據(jù)規(guī)模和計算性能來分配合適的資源,避免出現(xiàn) OOM、執(zhí)行慢等問題。如何充分利用有限集群資源進行數(shù)據(jù)計算,是我們在使用 Spark 過程中需要考慮的重要問題,而這個問題又可以拆分為兩個子問題:如何把集群資源都分配掉,如何把分配的資源都用起來。這些與 Spark 相關(guān)的話題就不在這里展開敘述,筆者將另起博文來談。

          本節(jié)主要結(jié)合上述的第一個子問題來談?wù)?Zeppelin 下 Spark 的資源配置問題,主要為兩塊內(nèi)容:一個是概括性的聊一聊怎么分配資源,另一個是如何將擬定的參數(shù)透過 Zeppelin 設(shè)置到 Spark 中。

          資源的分配主要考慮 CPU 和內(nèi)存,前者決定了并行計算能力,后者決定了數(shù)據(jù)的存取速度。以 Spark on YARN Cluster 為例,由 YARN 負責系統(tǒng)資源調(diào)度,Spark Driver 負責任務(wù)的調(diào)度,將任務(wù)分發(fā)到各個 Executor 中執(zhí)行。在資源分配上,需要分別考慮到這三個角色:


          • YARN 可調(diào)度的資源有多少?單臺機器上的資源并不能都交給 YARN 來調(diào)度,因為 OS 等其他系統(tǒng)也需要占用一定的資源,具體可調(diào)度的資源由 yarn.nodemanager.resource.cpu-vcores 和 yarn.nodemanager.resource.memory-mb 來配置。

          • 為 Spark Driver 分配資源。內(nèi)存是 Driver 需要關(guān)注的重點。如果 Spark 的 Action 會導致大量的數(shù)據(jù)需要返回到 Driver 中,就要考慮增加內(nèi)存。比如,將大量執(zhí)行結(jié)果收集過來,又或者加載的 Parquet 文件有大量的 Partition 信息需要緩存。

          • 為 Spark Executor 分配資源。任務(wù)分發(fā)到 Executor 中執(zhí)行時,可并行執(zhí)行的個數(shù)取決于整體 Executor 有多少個 vcore,即(Executor 個數(shù) * 單個 Executor 的 vcore 數(shù)),而任務(wù)執(zhí)行過程中所消耗的內(nèi)存也由 Executor 來承擔。


          在有限的資源下,分配工作可能并不是一蹴而就的,需要調(diào)整多次才能滿足需求。綜合來看,需要重點考慮四個參數(shù)(其他細節(jié)不在這里介紹):spark.driver.memory、spark.executor.cores、spark.executor.memory、spark.executor.instances。原則是:通過合理組合,盡可能將單機可分配的資源都用掉。


          這些參數(shù)可通過 spark-submit 命令行傳入,或者在代碼中設(shè)置到 SparkConf 中。但是 Zeppelin 下的 Spark 是在第一次執(zhí)行代碼時由 Zeppelin 自動啟動起來的,那么如何將這些配置設(shè)定下去呢?事實上,每個 Zeppelin Interpreter 都可以有自己的配置項,包含在 Interpreter Setting 中,其內(nèi)容保存在 interpreter.json 文件,并可通過 Web UI 來修改,如下圖所示。Spark Interpreter 會在啟動 Spark 時,將這些參數(shù)設(shè)置到 SparkConf 中。


          工作原理

          有了上面的認識,基本可以較好的使用 Zeppelin 了。但是,工程師的好奇心不會就此停止。在 Zeppelin 的 Web UI 中編寫的代碼,是如何提交到 Spark 中執(zhí)行的?所謂 Interpreter 究竟是怎么工作的?要搞清楚這些問題,閱讀源碼自然是最佳途徑。從ZeppelinRestApi.runParagraph到Paragraph.jobRun再到Spark Interpreter,一路讀下去即可。本節(jié)將對相關(guān)工作原理做一個簡單的介紹,源碼部分讀者可以根據(jù)個人興趣選擇性閱讀。


          如下圖所示,整個系統(tǒng)分為三塊:Zeppelin Web Client、Zeppelin Server 和 Spark,前兩個為 Zeppelin 的前后端模塊,第三個為計算引擎。要搞清楚整體的工作原理,需要搞清楚兩個問題:第一,這三者之間是如何通信的?第二,Interpreter 是如何被調(diào)用的?


          在 Spark 中,Driver 負責任務(wù)的調(diào)度。在 Zeppelin Web Client 上編寫的代碼片段只有傳送到 Driver 進程中,才能得以執(zhí)行。首先,Zeppelin Web Client 通過 REST API 與 Server 端進行交互,Server 收到后將其提交到任務(wù)隊列中順序執(zhí)行。其次,Zeppelin Server 與 Spark Driver 是不同的進程,并且很可能不在同一臺機器上,要實現(xiàn)這樣的交互,自然少不了 RPC 通信,這里采用了 Thrift。Spark Driver 所在的進程本身是一個 Thrift Server 進程,由 Zeppelin Server 發(fā)起 RPC 調(diào)用,將相關(guān)信息傳遞過來。


          每一個計算引擎都有自己對應(yīng)的 Interpreter 類,實現(xiàn)了 interpret(String st, InterpreterContext context) 方法,用于執(zhí)行具體代碼。對于每一個 Paragraph 的執(zhí)行,都會根據(jù) NoteId 和 Interpreter 名字來獲取對應(yīng)的 Interpreter 實例。如果是第一次執(zhí)行,便會根據(jù)配置創(chuàng)建對應(yīng)的 Interpreter Group 及相關(guān)的 Interpreter 實例。在 Zeppelin Server 中,Interpreter 是由 RemoteInterpreter 來代理的(代理模式),其集成了 Thrift Client 相關(guān)接口,用于 RPC 調(diào)用。第一次執(zhí)行時,Zeppelin Server 會啟動 Spark,Spark 的主進程(Driver)入口是 RemoteInterpreterServer,其集成了 Thrift Server 相關(guān)接口,用于 RPC 調(diào)用。在 RPC 調(diào)用中,會獲取對應(yīng)的 Interpreter 實例來完成具體執(zhí)行。


          結(jié)束語


          坦白地說,筆者并不是 Zeppelin 高手,只是在工作中遇到了文章開頭提到的問題時才開始使用 Zeppelin,而在使用過程中又遇到些問題并加以解決,后來因為個人興趣看了些源碼,于是才有了這篇博文。期望對感興趣的朋友有所幫助。


          作者:Mr-Bruce

          原文:https://blog.csdn.net/zwgdft/article/details/86417429

          瀏覽 58
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月天婷婷激情网 | 国产精品毛片 | 亚洲天堂成人 | 国产成人+综合亚洲+天堂 | 欧美一级黄色录像在线视频官网 |