在Idea以yarn-cluster/client方式提交spark程序到yarn上
程序员文章站
2022-04-01 15:44:28
...
前期准备:
- 在hosts文件已经配置了集群的主机和ip的映射
- 本地解压spark安装包
- 本地解压Hadoop安装包
- 到集群上下载好yarn的配置文件,并解压,以cdh为例下载之后解压即可(位置没要求)
编写代码:
提交代码类:
package com.hzx.spark;
import org.apache.spark.launcher.SparkLauncher;
import java.io.*;
import java.util.*;
/**
* Created by Huangzx
* Date:16/6/20
* Time:4:47 PM
*/
public class SparkLaunchTest {
public static void main(String[] args) {
launch();
}
private static void launch() {
Map<String, String> env = new HashMap();
env.put("hadoop.home.dir", "/Users/huangzhenxuan/software/hadoop-2.9.2"); //Hadoop安装包解压后的路径
env.put("HADOOP_HOME", "/Users/huangzhenxuan/software/hadoop-2.9.2"); //Hadoop安装包解压后的路径
env.put("spark.home.dir", "/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7"); //spark安装包解压后的路径
env.put("SPARK_HOME", "/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7"); //spark安装包解压后的路径
env.put("HADOOP_CONF_DIR", "/Users/huangzhenxuan/software/yarn-client"); //下载的yarn配置文件路径
env.put("YARN_CONF_DIR", "/Users/huangzhenxuan/software/yarn-client"); //下载的yarn配置文件路径
env.put("HADOOP_USER_NAME", "huangzx"); //以***用户提交程序
// env.put("spark.yarn.jars","hdfs://dataexa-cluster-manager-2-2:8020/data/dataexa/insight/yarn-jars/*"); //想提交的快一点儿可以先把spark安装包下的jar目录下的所有jar包先上传到hdfs上
try {
SparkLauncher launcher = new SparkLa
uncher(env);
launcher = launcher.setAppName("sparkLaunchTest").setAppResource("/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar").setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn").setVerbose(true); //设置spark程序名称,要提交运行的jar包以及jar包的主函数
launcher = launcher.setDeployMode("cluster"); //设置提交方式
// launcher.addSparkArg("spark.yarn.queue","root.users.huangzx"); //指定资源池,可以设置key,value形式的spark参数,比如调优参数
launcher.addAppArgs("100"); //应用程序的参数
Process process = launcher.launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(),"input");
Thread inputThread = new Thread(inputStreamReaderRunnable,"LogStreamReader input"); //获取一般日志
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(),"error");
Thread errorThread = new Thread(errorStreamReaderRunnable,"LogStreamReader error"); //获取错误日志
errorThread.start();
int exitCode = process.waitFor(); //等待程序执行完毕
System.out.println("Finished! Exit code:" + exitCode);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
收集日志工具类:
package com.hzx.spark;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* Created by Huangzx
* Date:16/6/20
* Time:7:30 PM
*/
public class InputStreamReaderRunnable implements Runnable{
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = "";
while ((line = reader.readLine()) != null){
System.out.println(line);
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}