欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在Idea以yarn-cluster/client方式提交spark程序到yarn上

程序员文章站 2022-04-01 15:44:28
...

前期准备:

  1. 在hosts文件已经配置了集群的主机和ip的映射
  2. 本地解压spark安装包
  3. 本地解压Hadoop安装包
  4. 到集群上下载好yarn的配置文件,并解压,以cdh为例下载之后解压即可(位置没要求)在Idea以yarn-cluster/client方式提交spark程序到yarn上

 

编写代码:

提交代码类:

package com.hzx.spark;

import org.apache.spark.launcher.SparkLauncher;
import java.io.*;
import java.util.*;

/**
 * Created by Huangzx
 * Date:16/6/20
 * Time:4:47 PM
 */
public class SparkLaunchTest {
    public static void main(String[] args) {
        launch();
    }
    private static void launch() {
        Map<String, String> env = new HashMap();
        env.put("hadoop.home.dir", "/Users/huangzhenxuan/software/hadoop-2.9.2");  //Hadoop安装包解压后的路径
        env.put("HADOOP_HOME", "/Users/huangzhenxuan/software/hadoop-2.9.2");  //Hadoop安装包解压后的路径
        env.put("spark.home.dir", "/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7"); //spark安装包解压后的路径
        env.put("SPARK_HOME", "/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7");  //spark安装包解压后的路径
        env.put("HADOOP_CONF_DIR", "/Users/huangzhenxuan/software/yarn-client");  //下载的yarn配置文件路径
        env.put("YARN_CONF_DIR", "/Users/huangzhenxuan/software/yarn-client");  //下载的yarn配置文件路径
        env.put("HADOOP_USER_NAME", "huangzx");  //以***用户提交程序
      //  env.put("spark.yarn.jars","hdfs://dataexa-cluster-manager-2-2:8020/data/dataexa/insight/yarn-jars/*"); //想提交的快一点儿可以先把spark安装包下的jar目录下的所有jar包先上传到hdfs上

        try {
            SparkLauncher launcher = new SparkLa
uncher(env);
            launcher = launcher.setAppName("sparkLaunchTest").setAppResource("/Users/huangzhenxuan/software/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar").setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn").setVerbose(true); //设置spark程序名称,要提交运行的jar包以及jar包的主函数

            launcher = launcher.setDeployMode("cluster"); //设置提交方式
//            launcher.addSparkArg("spark.yarn.queue","root.users.huangzx");  //指定资源池,可以设置key,value形式的spark参数,比如调优参数
            launcher.addAppArgs("100");  //应用程序的参数

            Process process = launcher.launch();
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(),"input");
            Thread inputThread = new Thread(inputStreamReaderRunnable,"LogStreamReader input");  //获取一般日志
            inputThread.start();

            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(),"error");
            Thread errorThread = new Thread(errorStreamReaderRunnable,"LogStreamReader error");  //获取错误日志
            errorThread.start();

            int exitCode = process.waitFor();  //等待程序执行完毕
            System.out.println("Finished! Exit code:" + exitCode);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

收集日志工具类:

package com.hzx.spark;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

/**
 * Created by Huangzx
 * Date:16/6/20
 * Time:7:30 PM
 */
public class InputStreamReaderRunnable implements Runnable{
    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = "";
            while ((line = reader.readLine()) != null){
                System.out.println(line);
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}