Spark Yarn Client模式解析
最近在定位Yarn的crash問題時(shí),順便把spark怎么使用yarn的好好的梳理了一遍。不過我先了解一下Yarn和怎么提交yarn的job的。
首先我們先看看Yarn的架構(gòu):

圖 1 Yarn 分布式架構(gòu)
-
ResourceManager
-
一個(gè)純粹的調(diào)度器
-
根據(jù)應(yīng)用程序的資源請求嚴(yán)格限制系統(tǒng)的可用資源
-
在保證容量、公平性及服務(wù)等級的情況下,優(yōu)化集群資源利用率,讓所有資源都得到充分的利用
-
由可插拔的調(diào)度器來應(yīng)用不同的調(diào)度算法,如注重容量調(diào)度還是注意公平調(diào)度
-
ApplicationManager
-
負(fù)責(zé)與ResourceManager協(xié)商資源,并和NodeManager協(xié)同工作來執(zhí)行和監(jiān)控Container以及他們的資源消耗
-
有責(zé)任與ResourceManager協(xié)商并獲取合適的資源Container,跟蹤他們的狀態(tài),以及監(jiān)控其進(jìn)展
-
在真實(shí)環(huán)境中,每一個(gè)應(yīng)用都有自己的ApplicationMaster的實(shí)例,但是也可為一組提供一個(gè)ApplicationMaster,比如Pig或者Hive的ApplicationMaster
一、 YarnClient 編寫
1. 創(chuàng)建Yarn客戶端
YarnClient yarnClient =YarnClient.createYarnClien。t();yarnClient.init(conf);yarnClient.start();
2. 創(chuàng)建Yarn應(yīng)用
YarnClientApplication app = yarnClient.createApplication();
3. 設(shè)置Applicaton 的名字,內(nèi)存 和 cpu 需求 以及 優(yōu)先級和 Queue 信息, YARN將根據(jù)這些信息來調(diào)度AppMaster
app.getApplicationSubmissionContext().setApplicationName("jenkins.ApplicationMaster");app.getApplicationSubmissionContext().setResource(Resource.newInstance(100,1));app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));app.getApplicationSubmissionContext().setQueue("default");
4. 設(shè)置ContainerLaunchContext,這一步, amContainer中包含了App Master 執(zhí)行需要的資源文件,環(huán)境變量和 啟動命令,這里將資源文件上傳到了HDFS,這樣在NODE Manager 就可以通過 HDFS取得這些文件
app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);
5. 提交應(yīng)用
ApplicationId appId =yarnClient.submitApplication(app.getApplicationSubmissionContext());
二、YARNApplicationMaster 編寫
ApplicationMaster編寫的編寫比較復(fù)雜,其需要通Resource Manager 和 Node Manager 交互,
通過ResourceManager:申請Container,并接收Resource Manager的一些消息,如可用的Container,結(jié)束的Container等。
通過NodeManage: 啟動Container,并接收Node Manage 的一些消息,如Container的狀態(tài)變化以及Node狀態(tài)變化。
1. 創(chuàng)建一個(gè)AMRMClientAsync對象,這個(gè)對象負(fù)責(zé)與ResourceManager 交互
amRMClient= AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());
這里的RMCallbackHandler是我們編寫的繼承自AMRMClientAsync.CallbackHandler 的一個(gè)類,其功能是處理由ResourceManager收到的消息,
其需要實(shí)現(xiàn)的方法由如下
public void onContainersCompleted(List<ContainerStatus> statuses);public void onContainersAllocated(List<Container> containers) ;public void onShutdownRequest() ;public void onNodesUpdated(List<NodeReport> updatedNodes) ;public void onError(Throwable e) ;
這里不考慮異常的情況下,只寫onContainersAllocated, onContainersCompleted 這兩個(gè)既可以, 一個(gè)是當(dāng)有新的Container 可以使用, 一個(gè)是Container 運(yùn)行結(jié)束。
在onContainersAllocated我們需要編寫啟動container的代碼,amNMClient.startContainerAsync(container, ctx); 這里的ctx 同Yarn Client 中第4步中的amContainer是同一個(gè)類型, 即這個(gè)container 執(zhí)行的一些資源,環(huán)境變量與命令等, 因?yàn)檫@是在回調(diào)函數(shù)中,為了保證時(shí)效性,這個(gè)操作最好放在線程池中異步操作。
在onContainersCompleted中,如果是失敗的Container,我們需要重新申請并啟動Container,(這一點(diǎn)有可能是YARN的 Fair Schedule 中會強(qiáng)制退出某些Container 以釋放資源)成功的將做記錄既可以。
2. 創(chuàng)建一個(gè)NMClientAsyncImpl對象,這個(gè)對象負(fù)責(zé)與NodeManager 交互
amNMClient= new NMClientAsyncImpl(new NMCallbackHandler());
這里NMCallbackHandler使我們需要編寫的繼承自NMClientAsync.CallbackHandler 的對象,其功能是處理由NodeManager 收到的消息
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse);public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus);public void onContainerStopped(ContainerId containerId) ;public void onStartContainerError(ContainerId containerId, Throwable t);public void onGetContainerStatusError(ContainerId containerId, Throwable t) ;public void onStopContainerError(ContainerId containerId, Throwable t);
這里簡單的不考慮異常的情況下,這些函數(shù)可以寫一個(gè)空函數(shù)體,忽略掉處理
3. 將ApplicationMaster注冊到ResourceManager 上
RegisterApplicationMasterResponseresponse = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1,"");
這個(gè)函數(shù)將自己注冊到RM上,這里沒有提供RPCport 和TrackURL.
4. ApplicationMaster向ResourceManager申請Container
ContainerRequestcontainerAsk = new ContainerRequest(//100*10M + 1vcpuResource.newInstance(100, 1), null, null,Priority.newInstance(0));amRMClient.addContainerRequest(containerAsk);
這里一個(gè)containerAsk表示申請一個(gè)Container,這里的對nodes和rasks設(shè)置為NULL,猜測MapReduce應(yīng)該由參數(shù)來嘗試申請靠近HDFSblock的container的
5. 申請到Container后,回調(diào)AMRMClientAsync.CallbackHandler的onContainersAllocated就會響應(yīng),然后通過amNMClient在Container運(yùn)行計(jì)算任務(wù):
List<String>commands = new LinkedList<String>();commands.add("sleep"+ sleepSeconds.addAndGet(1));ContainerLaunchContextctx = ContainerLaunchContext.newInstance(null, null, commands, null, null,null);amNMClient.startContainerAsync(container,ctx);
6. 等待Container 執(zhí)行完畢,清理退出
我的代碼如下, 循環(huán)等待container執(zhí)行完畢,并上報(bào)執(zhí)行結(jié)果
void waitComplete() throws YarnException, IOException{while(numTotalContainers.get() != numCompletedConatiners.get()){try{Thread.sleep(1000);LOG.info("waitComplete" +", numTotalContainers=" + numTotalContainers.get() +", numCompletedConatiners=" + numCompletedConatiners.get());} catch (InterruptedException ex){}}exeService.shutdown();amNMClient.stop();amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "dummy Message", null);amRMClient.stop();}
三、YARN Container Application
真正處理數(shù)據(jù)的是由ApplicationMaster由amNMClient.startContainerAsync(container, ctx)提交的 Containerapplication, 然后這這個(gè)應(yīng)用并不需要特殊編寫,任何程序通過提交相應(yīng)的運(yùn)行信息都可以在這些Node中的某個(gè)Container 中執(zhí)行,所以這個(gè)程序可以是一個(gè)復(fù)雜的MapReduce Task 或者是一個(gè)簡單的腳本。
總結(jié):
YARN 提供了對cluster 資源管理 和 作業(yè)調(diào)度的功能。
編寫一個(gè)應(yīng)用運(yùn)行在YARN 之上,比較復(fù)雜的是ApplicationMaster的編寫,其需要維護(hù)container 的狀態(tài)并能共做一些錯誤恢復(fù),重啟應(yīng)用的操作。比較簡答的是Client的編寫,只需要提交必須的信息既可以,不需要維護(hù)狀態(tài)。真正運(yùn)行處理數(shù)據(jù)的是Container Application ,這個(gè)程序可以不需要針對YARN做代碼編寫
四、Spark Yarn Client模式
Spark Yarn有兩種模式,一直是client模式,一種是cluster模式,今天我們先說說client模式,以下是Spark YarnClient的交互圖。

圖 2 Spark Yarn Client 模式

圖 3 Spark Yarn 類圖

圖 4 Spark Yarn Client 模式 job 提交過程
