<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          本地IDEA提交Flink/Spark任務(wù)到集群的工具

          共 8369字,需瀏覽 17分鐘

           ·

          2021-11-18 06:07

          作者鏈接:https://www.jianshu.com/p/ae1919fdf399
          文章中所述的spark/flink提交任務(wù)的github工程:
          https://github.com/todd5167/flink-spark-submiter
          Flink任務(wù)、Spark任務(wù)提交到集群,通常需要將可執(zhí)行Jar上傳到集群,手動執(zhí)行任務(wù)提交指令,如果有配套的大數(shù)據(jù)平臺則需要上傳Jar,由調(diào)度系統(tǒng)進行任務(wù)提交。對開發(fā)者來說,本地IDEA調(diào)試Flink、Spark任務(wù)不涉及對象的序列化及反序列化,任務(wù)在本地調(diào)試通過后,執(zhí)行在分布式環(huán)境下也可能會出錯。而將任務(wù)提交到集群進行調(diào)試還要走那些繁瑣的流程太影響效率了。
          因此,為方便大數(shù)據(jù)開發(fā)人員進行快速開發(fā)調(diào)試,開發(fā)了從本地IDEA提交Flink/Spark任務(wù)到集群的工具類 flink-spark-submiter。任務(wù)提交代碼稍加改造后也可以和上層調(diào)度系統(tǒng)進行集成,替代腳本模式進行任務(wù)提交的方式。
          • 支持Flink yarnPerJob、Standalone 、yarnSession模式下的任務(wù)提交。
          • 支持Spark任務(wù)以Yarn Cluster模式提交到Y(jié)ARN,支持自動上傳用戶Jar包,依賴的Spark Jars需要提前上傳到HDFS。
          • 支持Spark任務(wù)提交到K8s Cluster,執(zhí)行的jar需要包含在鏡像中,任務(wù)執(zhí)行時需要傳遞鏡像名稱及可執(zhí)行文件路徑。如果需要操作hive表,則需要傳遞集群所在文件夾,以及HADOOP_USER_NAME,系統(tǒng)進行Hadoop文件的掛載及環(huán)境變量的設(shè)置。

          Flink 多執(zhí)行模式任務(wù)提交

          • 需要填寫Flink任務(wù)運行時參數(shù)配置,任務(wù)運行所在的集群配置路徑,本地Flink根路徑。項目依賴flink1.10版本。
          • 支持以YarnSession、YarnPerjob、Standalone模式進行任務(wù)提交,返回ApplicationId。
          • example模塊下包含一個FlinkDemo,打包后會轉(zhuǎn)移到項目的examplJars中,可以嘗試進行任務(wù)提交。
          • 任務(wù)提交后,根據(jù)ApplicationId獲取任務(wù)執(zhí)行使用的jm、tm日志基本信息,包含日志訪問URL,日志總字節(jié)大小,根據(jù)日志基本信息可以做日志滾動展示,防止Yarn日志過大導(dǎo)致日志讀取卡死。
          任務(wù)提交示例:
          // 可執(zhí)行jar包路徑 String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/flink-kafka-reader/flink-kafka-reader.jar"; // 任務(wù)參數(shù) String[] execArgs = new String[]{"-jobName", "flink110Submit", "--topic", "mqTest01", "--bootstrapServers", "172.16.8.107:9092"}; // 任務(wù)名稱 String jobName = "Flink perjob submit"; // flink 文件夾路徑 String flinkConfDir = "/Users/maqi/tmp/flink/flink-1.10.0/conf"; // flink lib包路徑 String flinkJarPath = "/Users/maqi/tmp/flink/flink-1.10.0/lib"; // yarn 文件夾路徑 // String yarnConfDir = "/Users/maqi/tmp/hadoopconf"; String yarnConfDir = "/Users/maqi/tmp/hadoopconf"; // 作業(yè)依賴的外部文件,例如:udf jar , keytab String[] dependFile = new String[]{"/Users/maqi/tmp/flink/flink-1.10.0/README.txt"}; // 任務(wù)提交隊列 String queue = "root.users.hdfs"; // flink任務(wù)執(zhí)行模式 String execMode = "yarnPerjob"; // yarnsession appid配置 Properties yarnSessionConfProperties = null; // savepoint 及并行度相關(guān) Properties confProperties = new Properties(); confProperties.setProperty("parallelism", "1");
          JobParamsInfo jobParamsInfo = JobParamsInfo.builder() .setExecArgs(execArgs) .setName(jobName) .setRunJarPath(runJarPath) .setDependFile(dependFile) .setFlinkConfDir(flinkConfDir) .setYarnConfDir(yarnConfDir) .setConfProperties(confProperties) .setYarnSessionConfProperties(yarnSessionConfProperties) .setFlinkJarPath(flinkJarPath) .setQueue(queue) .build();

          String applicationId = runFlinkJob(jobParamsInfo, execMode); //任務(wù)啟動后,拉取jm,tm日志相關(guān)信息。 Thread.sleep(20000); List logsInfo = new RunningLog().getRollingLogBaseInfo(jobParamsInfo, applicationId); logsInfo.forEach(System.out::println);
          jobmanager日志格式:
          {   "logs":[       {           "name":"jobmanager.err ",           "totalBytes":"555",           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.err/"       },       {           "name":"jobmanager.log ",           "totalBytes":"31944",           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.log/"       },       {           "name":"jobmanager.out ",           "totalBytes":"0",           "url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.out/"       }   ],   "typeName":"jobmanager"}

          taskmanager日志格式:
          {    "logs":[        {            "name":"taskmanager.err ",            "totalBytes":"560",            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.err/"        },        {            "name":"taskmanager.log ",            "totalBytes":"35937",            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.log/"        },        {            "name":"taskmanager.out ",            "totalBytes":"0",            "url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.out/"        }    ],    "otherInfo":"{"dataPort":36218,"freeSlots":0,"hardware":{"cpuCores":4,"freeMemory":241172480,"managedMemory":308700779,"physicalMemory":8201641984},"id":"container_e27_1593571725037_0170_01_000002","path":"akka.tcp://flink@node03:36791/user/taskmanager_0","slotsNumber":1,"timeSinceLastHeartbeat":1593659561129}",    "typeName":"taskmanager"}

          Spark on yarn 任務(wù)提交

          • 填寫用戶程序包路徑、執(zhí)行參數(shù)、集群配置文件夾、安全認證等相關(guān)配置。
          • Spark任務(wù)提交使用Yarn cluster模式,使用的Spark Jar需要提前上傳到HDFS并作為archive的參數(shù)。
          • 針對SparkSQL任務(wù),通過提交examples中的spark-sql-proxy程序包來直接操作hive表。
          提交示例:

          public static void main(String[] args) throws Exception { boolean openKerberos = true; String appName = "todd spark submit"; String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/spark-sql-proxy/spark-sql-proxy.jar"; String mainClass = "cn.todd.spark.SparksqlProxy"; String yarnConfDir = "/Users/maqi/tmp/hadoopconf"; String principal = "hdfs/[email protected]"; String keyTab = "/Users/maqi/tmp/hadoopconf/hdfs.keytab"; String jarHdfsDir = "sparkproxy2"; String archive = "hdfs://nameservice1/sparkjars/jars"; String queue = "root.users.hdfs"; String execArgs = getExampleJobParams();
          Properties confProperties = new Properties(); confProperties.setProperty("spark.executor.cores","2");
          JobParamsInfo jobParamsInfo = JobParamsInfo.builder() .setAppName(appName) .setRunJarPath(runJarPath) .setMainClass(mainClass) .setYarnConfDir(yarnConfDir) .setPrincipal(principal) .setKeytab(keyTab) .setJarHdfsDir(jarHdfsDir) .setArchivePath(archive) .setQueue(queue) .setExecArgs(execArgs) .setConfProperties(confProperties) .setOpenKerberos(BooleanUtils.toString(openKerberos, "true", "false")) .build();
          YarnConfiguration yarnConf = YarnConfLoaderUtil.getYarnConf(yarnConfDir); String applicationId = "";
          if (BooleanUtils.toBoolean(openKerberos)) { UserGroupInformation.setConfiguration(yarnConf); UserGroupInformation userGroupInformation = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); applicationId = userGroupInformation.doAs((PrivilegedExceptionAction) () -> LauncherMain.run(jobParamsInfo, yarnConf)); } else { LauncherMain.run(jobParamsInfo, yarnConf); }
          System.out.println(applicationId); }

          Spark on k8s 任務(wù)提交

          • 基于Spark2.4.4進行開發(fā),通過將spark-sql-proxy.jar包打入鏡像來執(zhí)行Sparksql并操作Hive表,無其他特殊操作。
          • 操作Hive時需要傳遞hadoopConfDir,程序會自動將.xml文件內(nèi)容進行掛載,如果非root用戶操作Hive,需要設(shè)置HADOOP_USER_NAME。
          • 通過讀取kubeConfig配置文件進行Kuberclient的創(chuàng)建,而非官方提供的master url方式。
          • 任務(wù)提交后立即返回spark-app-selector id,從而進行POD狀態(tài)獲取。
          public static void main(String[] args) throws Exception { String appName = "todd spark submit"; // 鏡像內(nèi)的jar路徑 String runJarPath = "local:///opt/dtstack/spark/spark-sql-proxy.jar"; String mainClass = "cn.todd.spark.SparksqlProxy"; String hadoopConfDir = "/Users/maqi/tmp/hadoopconf/"; String kubeConfig = "/Users/maqi/tmp/conf/k8s.config"; String imageName = "mqspark:2.4.4"; String execArgs = getExampleJobParams();
          Properties confProperties = new Properties(); confProperties.setProperty("spark.executor.instances", "2"); confProperties.setProperty("spark.kubernetes.namespace", "default"); confProperties.setProperty("spark.kubernetes.authenticate.driver.serviceAccountName", "spark"); confProperties.setProperty("spark.kubernetes.container.image.pullPolicy", "IfNotPresent");

          JobParamsInfo jobParamsInfo = JobParamsInfo.builder() .setAppName(appName) .setRunJarPath(runJarPath) .setMainClass(mainClass) .setExecArgs(execArgs) .setConfProperties(confProperties) .setHadoopConfDir(hadoopConfDir) .setKubeConfig(kubeConfig) .setImageName(imageName) .build();
          String id = run(jobParamsInfo); System.out.println(id); }
          閱讀原文直接可以跳轉(zhuǎn)至作者的github



          瀏覽 39
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一级a一级a爰片免费免免小说 | 国内精品综合 | 高清无码视频在线看 | www.欧美在线观看 | 国产精品久久久久久久免费大片 |