<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark Yarn Client模式解析

          共 8105字,需瀏覽 17分鐘

           ·

          2023-08-20 03:47

          最近在定位Yarncrash問題時(shí),順便把spark怎么使用yarn的好好的梳理了一遍。不過我先了解一下Yarn和怎么提交yarnjob的。

          首先我們先看看Yarn的架構(gòu):


          a2650560ed0a28e29c50b6912ee932db.webp


          1 Yarn 分布式架構(gòu)

          • ResourceManager

          • 一個(gè)純粹的調(diào)度器

          • 根據(jù)應(yīng)用程序的資源請求嚴(yán)格限制系統(tǒng)的可用資源

          • 在保證容量、公平性及服務(wù)等級的情況下,優(yōu)化集群資源利用率,讓所有資源都得到充分的利用

          • 由可插拔的調(diào)度器來應(yīng)用不同的調(diào)度算法,如注重容量調(diào)度還是注意公平調(diào)度

           

          • ApplicationManager

          • 負(fù)責(zé)與ResourceManager協(xié)商資源,并和NodeManager協(xié)同工作來執(zhí)行和監(jiān)控Container以及他們的資源消耗

          • 有責(zé)任與ResourceManager協(xié)商并獲取合適的資源Container,跟蹤他們的狀態(tài),以及監(jiān)控其進(jìn)展

          • 在真實(shí)環(huán)境中,每一個(gè)應(yīng)用都有自己的ApplicationMaster的實(shí)例,但是也可為一組提供一個(gè)ApplicationMaster,比如Pig或者HiveApplicationMaster

           

          一、 YarnClient 編寫

          1. 創(chuàng)建Yarn客戶端

                
                  YarnClient yarnClient =YarnClient.createYarnClien。t();
                
                
                  
                    yarnClient.init(conf);
                  
                
                
                  
                    yarnClient.start();
                  
                
              


           

          2. 創(chuàng)建Yarn應(yīng)用

                
                  YarnClientApplication app = yarnClient.createApplication();
                
              

           

          3. 設(shè)置Applicaton 的名字,內(nèi)存 cpu 需求 以及 優(yōu)先級和 Queue 信息, YARN將根據(jù)這些信息來調(diào)度AppMaster

                
                  app.getApplicationSubmissionContext().setApplicationName("jenkins.ApplicationMaster");
                
                
                  app.getApplicationSubmissionContext().setResource(Resource.newInstance(100,1));
                
                
                  app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));
                
                
                  app.getApplicationSubmissionContext().setQueue("default");
                
              


           

          4. 設(shè)置ContainerLaunchContext,這一步, amContainer中包含了App Master 執(zhí)行需要的資源文件,環(huán)境變量 啟動命令,這里將資源文件上傳到了HDFS,這樣在NODE Manager 就可以通過 HDFS取得這些文件

                
                  app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);
                
              

           

          5. 提交應(yīng)用

                
                  ApplicationId appId =yarnClient.submitApplication(app.getApplicationSubmissionContext());
                
              

           

          二、YARNApplicationMaster 編寫

          ApplicationMaster編寫的編寫比較復(fù)雜,其需要通Resource Manager Node Manager 交互,

          通過ResourceManager:申請Container,并接收Resource Manager的一些消息,如可用的Container,結(jié)束的Container等。

          通過NodeManage: 啟動Container,并接收Node Manage 的一些消息,如Container的狀態(tài)變化以及Node狀態(tài)變化。

           

          1. 創(chuàng)建一個(gè)AMRMClientAsync對象,這個(gè)對象負(fù)責(zé)與ResourceManager 交互

                  amRMClient= AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());

                   這里的RMCallbackHandler是我們編寫的繼承自AMRMClientAsync.CallbackHandler 的一個(gè)類,其功能是處理由ResourceManager收到的消息,

                   其需要實(shí)現(xiàn)的方法由如下

                   

                
                  public void onContainersCompleted(List<ContainerStatus> statuses);
                
                
                  public void onContainersAllocated(List<Container> containers) ;
                
                
                  public void onShutdownRequest() ;
                
                
                  public void onNodesUpdated(List<NodeReport> updatedNodes) ;
                
                
                  public void onError(Throwable e) ;
                
              

           

                   這里不考慮異常的情況下,只寫onContainersAllocated, onContainersCompleted 這兩個(gè)既可以, 一個(gè)是當(dāng)有新的Container 可以使用, 一個(gè)是Container 運(yùn)行結(jié)束。

                   onContainersAllocated我們需要編寫啟動container的代碼,amNMClient.startContainerAsync(container, ctx); 這里的ctx Yarn Client 中第4步中的amContainer是同一個(gè)類型, 即這個(gè)container 執(zhí)行的一些資源,環(huán)境變量與命令等, 因?yàn)檫@是在回調(diào)函數(shù)中,為了保證時(shí)效性,這個(gè)操作最好放在線程池中異步操作。

                   onContainersCompleted中,如果是失敗的Container,我們需要重新申請并啟動Container,(這一點(diǎn)有可能是YARN Fair Schedule 中會強(qiáng)制退出某些Container 以釋放資源)成功的將做記錄既可以。

            

          2. 創(chuàng)建一個(gè)NMClientAsyncImpl對象,這個(gè)對象負(fù)責(zé)與NodeManager 交互

                   amNMClient= new NMClientAsyncImpl(new NMCallbackHandler());

                   這里NMCallbackHandler使我們需要編寫的繼承自NMClientAsync.CallbackHandler 的對象,其功能是處理由NodeManager 收到的消息

                
                  public void onContainerStarted(ContainerId containerId,  Map<String, ByteBuffer> allServiceResponse);
                
                
                  public void onContainerStatusReceived(ContainerId containerId,  ContainerStatus containerStatus);
                
                
                  public void onContainerStopped(ContainerId containerId) ;
                
                
                  public void onStartContainerError(ContainerId containerId, Throwable t);
                
                
                  public void onGetContainerStatusError(ContainerId containerId,  Throwable t) ;
                
                
                  public void onStopContainerError(ContainerId containerId, Throwable t);
                
              

                   這里簡單的不考慮異常的情況下,這些函數(shù)可以寫一個(gè)空函數(shù)體,忽略掉處理

           

          3. ApplicationMaster注冊到ResourceManager

                   RegisterApplicationMasterResponseresponse = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1,"");

              這個(gè)函數(shù)將自己注冊到RM上,這里沒有提供RPCport TrackURL.

             

          4. ApplicationMasterResourceManager申請Container

            

                
                  ContainerRequestcontainerAsk = new ContainerRequest(
                
                
                                           //100*10M + 1vcpu
                
                
                                          Resource.newInstance(100, 1), null, null,
                
                
                                          Priority.newInstance(0));
                
                
                     amRMClient.addContainerRequest(containerAsk);
                
              

                 


              這里一個(gè)containerAsk表示申請一個(gè)Container,這里的對nodesrasks設(shè)置為NULL,猜測MapReduce應(yīng)該由參數(shù)來嘗試申請靠近HDFSblockcontainer

                  

          5. 申請到Container后,回調(diào)AMRMClientAsync.CallbackHandleronContainersAllocated就會響應(yīng),然后通過amNMClientContainer運(yùn)行計(jì)算任務(wù):

                   

                
                  List<String>commands = new LinkedList<String>();
                
                
                  commands.add("sleep"+ sleepSeconds.addAndGet(1));
                
                
                  ContainerLaunchContextctx = ContainerLaunchContext.newInstance(null, null, commands, null, null,null);
                
                
                  amNMClient.startContainerAsync(container,ctx);
                
              

           

          6. 等待Container 執(zhí)行完畢,清理退出

                   我的代碼如下, 循環(huán)等待container執(zhí)行完畢,并上報(bào)執(zhí)行結(jié)果

                
                  void waitComplete() throws YarnException, IOException{
                
                
                    while(numTotalContainers.get() != numCompletedConatiners.get()){
                
                
                       try{
                
                
                        Thread.sleep(1000);
                
                
                        LOG.info("waitComplete" + 
                
                
                           ", numTotalContainers=" + numTotalContainers.get() +
                
                
                           ", numCompletedConatiners=" + numCompletedConatiners.get());
                
                
                       } catch (InterruptedException ex){}
                
                
                    }
                
                
                    exeService.shutdown();
                
                
                    amNMClient.stop();
                
                
                    amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "dummy Message", null);
                
                
                    amRMClient.stop();
                
                
                  }
                
              


                  

          三、YARN Container Application

          真正處理數(shù)據(jù)的是由ApplicationMasteramNMClient.startContainerAsync(container, ctx)提交的 Containerapplication,  然后這這個(gè)應(yīng)用并不需要特殊編寫,任何程序通過提交相應(yīng)的運(yùn)行信息都可以在這些Node中的某個(gè)Container 中執(zhí)行,所以這個(gè)程序可以是一個(gè)復(fù)雜的MapReduce Task 或者是一個(gè)簡單的腳本。

           

          總結(jié):

          YARN 提供了對cluster 資源管理 作業(yè)調(diào)度的功能。

          編寫一個(gè)應(yīng)用運(yùn)行在YARN 之上,比較復(fù)雜的是ApplicationMaster的編寫,其需要維護(hù)container 的狀態(tài)并能共做一些錯誤恢復(fù),重啟應(yīng)用的操作。比較簡答的是Client的編寫,只需要提交必須的信息既可以,不需要維護(hù)狀態(tài)。真正運(yùn)行處理數(shù)據(jù)的是Container Application ,這個(gè)程序可以不需要針對YARN做代碼編寫

           

          四、Spark Yarn Client模式

          Spark Yarn有兩種模式,一直是client模式,一種是cluster模式,今天我們先說說client模式,以下是Spark YarnClient的交互圖。

           

                                     

          676e1ce15fb0da021d43cb3b097bbea3.webp

          2 Spark Yarn Client 模式


          1697abd39c6ccab068f9de099fc8a64b.webp


          3 Spark Yarn 類圖

           

          f0c6d3feb35ece4a71126a920d4b2279.webp


          4 Spark Yarn Client 模式 job 提交過程


          瀏覽 43
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷欧美| 欧美日韩在线视频免费播放 | 日日摸日日操 | 日韩毛片免费看 | 国产黄片乱伦 |