本地IDEA提交Flink/Spark任務(wù)到集群的工具
支持Flink yarnPerJob、Standalone 、yarnSession模式下的任務(wù)提交。 支持Spark任務(wù)以Yarn Cluster模式提交到Y(jié)ARN,支持自動上傳用戶Jar包,依賴的Spark Jars需要提前上傳到HDFS。 支持Spark任務(wù)提交到K8s Cluster,執(zhí)行的jar需要包含在鏡像中,任務(wù)執(zhí)行時需要傳遞鏡像名稱及可執(zhí)行文件路徑。如果需要操作hive表,則需要傳遞集群所在文件夾,以及HADOOP_USER_NAME,系統(tǒng)進行Hadoop文件的掛載及環(huán)境變量的設(shè)置。
Flink 多執(zhí)行模式任務(wù)提交
需要填寫Flink任務(wù)運行時參數(shù)配置,任務(wù)運行所在的集群配置路徑,本地Flink根路徑。項目依賴flink1.10版本。 支持以YarnSession、YarnPerjob、Standalone模式進行任務(wù)提交,返回ApplicationId。 example模塊下包含一個FlinkDemo,打包后會轉(zhuǎn)移到項目的examplJars中,可以嘗試進行任務(wù)提交。 任務(wù)提交后,根據(jù)ApplicationId獲取任務(wù)執(zhí)行使用的jm、tm日志基本信息,包含日志訪問URL,日志總字節(jié)大小,根據(jù)日志基本信息可以做日志滾動展示,防止Yarn日志過大導(dǎo)致日志讀取卡死。
// 可執(zhí)行jar包路徑String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/flink-kafka-reader/flink-kafka-reader.jar";// 任務(wù)參數(shù)String[] execArgs = new String[]{"-jobName", "flink110Submit", "--topic", "mqTest01", "--bootstrapServers", "172.16.8.107:9092"};// 任務(wù)名稱String jobName = "Flink perjob submit";// flink 文件夾路徑String flinkConfDir = "/Users/maqi/tmp/flink/flink-1.10.0/conf";// flink lib包路徑String flinkJarPath = "/Users/maqi/tmp/flink/flink-1.10.0/lib";// yarn 文件夾路徑// String yarnConfDir = "/Users/maqi/tmp/hadoopconf";String yarnConfDir = "/Users/maqi/tmp/hadoopconf";// 作業(yè)依賴的外部文件,例如:udf jar , keytabString[] dependFile = new String[]{"/Users/maqi/tmp/flink/flink-1.10.0/README.txt"};// 任務(wù)提交隊列String queue = "root.users.hdfs";// flink任務(wù)執(zhí)行模式String execMode = "yarnPerjob";// yarnsession appid配置Properties yarnSessionConfProperties = null;// savepoint 及并行度相關(guān)Properties confProperties = new Properties();confProperties.setProperty("parallelism", "1");JobParamsInfo jobParamsInfo = JobParamsInfo.builder().setExecArgs(execArgs).setName(jobName).setRunJarPath(runJarPath).setDependFile(dependFile).setFlinkConfDir(flinkConfDir).setYarnConfDir(yarnConfDir).setConfProperties(confProperties).setYarnSessionConfProperties(yarnSessionConfProperties).setFlinkJarPath(flinkJarPath).setQueue(queue).build();String applicationId = runFlinkJob(jobParamsInfo, execMode);//任務(wù)啟動后,拉取jm,tm日志相關(guān)信息。Thread.sleep(20000);ListlogsInfo = new RunningLog().getRollingLogBaseInfo(jobParamsInfo, applicationId); logsInfo.forEach(System.out::println);
{"logs":[{"name":"jobmanager.err ","totalBytes":"555","url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.err/"},{"name":"jobmanager.log ","totalBytes":"31944","url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.log/"},{"name":"jobmanager.out ","totalBytes":"0","url":"http://172-16-10-204:8042/node/containerlogs/container_e185_1593317332045_2246_01_000002/admin/jobmanager.out/"}],"typeName":"jobmanager"}
{"logs":[{"name":"taskmanager.err ","totalBytes":"560","url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.err/"},{"name":"taskmanager.log ","totalBytes":"35937","url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.log/"},{"name":"taskmanager.out ","totalBytes":"0","url":"http://node03:8042/node/containerlogs/container_e27_1593571725037_0170_01_000002/admin/taskmanager.out/"}],"otherInfo":"{"dataPort":36218,"freeSlots":0,"hardware":{"cpuCores":4,"freeMemory":241172480,"managedMemory":308700779,"physicalMemory":8201641984},"id":"container_e27_1593571725037_0170_01_000002","path":"akka.tcp://flink@node03:36791/user/taskmanager_0","slotsNumber":1,"timeSinceLastHeartbeat":1593659561129}","typeName":"taskmanager"}
Spark on yarn 任務(wù)提交
填寫用戶程序包路徑、執(zhí)行參數(shù)、集群配置文件夾、安全認證等相關(guān)配置。 Spark任務(wù)提交使用Yarn cluster模式,使用的Spark Jar需要提前上傳到HDFS并作為archive的參數(shù)。 針對SparkSQL任務(wù),通過提交examples中的spark-sql-proxy程序包來直接操作hive表。
public static void main(String[] args) throws Exception {boolean openKerberos = true;String appName = "todd spark submit";String runJarPath = "/Users/maqi/code/ClustersSubmiter/exampleJars/spark-sql-proxy/spark-sql-proxy.jar";String mainClass = "cn.todd.spark.SparksqlProxy";String yarnConfDir = "/Users/maqi/tmp/hadoopconf";String principal = "hdfs/[email protected]";String keyTab = "/Users/maqi/tmp/hadoopconf/hdfs.keytab";String jarHdfsDir = "sparkproxy2";String archive = "hdfs://nameservice1/sparkjars/jars";String queue = "root.users.hdfs";String execArgs = getExampleJobParams();Properties confProperties = new Properties();confProperties.setProperty("spark.executor.cores","2");JobParamsInfo jobParamsInfo = JobParamsInfo.builder().setAppName(appName).setRunJarPath(runJarPath).setMainClass(mainClass).setYarnConfDir(yarnConfDir).setPrincipal(principal).setKeytab(keyTab).setJarHdfsDir(jarHdfsDir).setArchivePath(archive).setQueue(queue).setExecArgs(execArgs).setConfProperties(confProperties).setOpenKerberos(BooleanUtils.toString(openKerberos, "true", "false")).build();YarnConfiguration yarnConf = YarnConfLoaderUtil.getYarnConf(yarnConfDir);String applicationId = "";if (BooleanUtils.toBoolean(openKerberos)) {UserGroupInformation.setConfiguration(yarnConf);UserGroupInformation userGroupInformation = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab);applicationId = userGroupInformation.doAs((PrivilegedExceptionAction) () -> LauncherMain.run(jobParamsInfo, yarnConf)); } else {LauncherMain.run(jobParamsInfo, yarnConf);}System.out.println(applicationId);}
Spark on k8s 任務(wù)提交
基于Spark2.4.4進行開發(fā),通過將spark-sql-proxy.jar包打入鏡像來執(zhí)行Sparksql并操作Hive表,無其他特殊操作。 操作Hive時需要傳遞hadoopConfDir,程序會自動將.xml文件內(nèi)容進行掛載,如果非root用戶操作Hive,需要設(shè)置HADOOP_USER_NAME。 通過讀取kubeConfig配置文件進行Kuberclient的創(chuàng)建,而非官方提供的master url方式。 任務(wù)提交后立即返回spark-app-selector id,從而進行POD狀態(tài)獲取。
public static void main(String[] args) throws Exception {String appName = "todd spark submit";// 鏡像內(nèi)的jar路徑String runJarPath = "local:///opt/dtstack/spark/spark-sql-proxy.jar";String mainClass = "cn.todd.spark.SparksqlProxy";String hadoopConfDir = "/Users/maqi/tmp/hadoopconf/";String kubeConfig = "/Users/maqi/tmp/conf/k8s.config";String imageName = "mqspark:2.4.4";String execArgs = getExampleJobParams();Properties confProperties = new Properties();confProperties.setProperty("spark.executor.instances", "2");confProperties.setProperty("spark.kubernetes.namespace", "default");confProperties.setProperty("spark.kubernetes.authenticate.driver.serviceAccountName", "spark");confProperties.setProperty("spark.kubernetes.container.image.pullPolicy", "IfNotPresent");JobParamsInfo jobParamsInfo = JobParamsInfo.builder().setAppName(appName).setRunJarPath(runJarPath).setMainClass(mainClass).setExecArgs(execArgs).setConfProperties(confProperties).setHadoopConfDir(hadoopConfDir).setKubeConfig(kubeConfig).setImageName(imageName).build();String id = run(jobParamsInfo);System.out.println(id);}
評論
圖片
表情
