欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java Web提交任务到Spark Standalone集群并监控

程序员文章站 2022-03-06 08:08:08
...

Java Web提交任务到Spark Standalone集群并监控

1. 环境

软件 版本 备注
IDEA 14.1.5
JDK 1.8
Spark 1.6.0 工程maven引用
Spark cdh5.7.3-spark1.6.0 实际集群5.7.3-1.cdh5.7.3.p0.5
Hadoop 2.6.4 工程Maven引用
Hadoop 2.6.0-cdh5.7.3 实际集群参数
Maven 3.3

2. 工程下载路径

工程在GitHub上地址为: javaweb_spark_standalone_monitor

3. Spark任务提交流程

之前做过相关的工作,知道可以通过下面的方式来提交任务到Spark Standalone集群:

String[] arg0=new String[]{
                "--master","spark://server2.tipdm.com:6066",
                "--deploy-mode","cluster",
                "--name",appName,
                "--class",className,
                "--executor-memory","2G",
                "--total-executor-cores","10",
                "--executor-cores","2",
                path,
                "/user/root/a.txt",
                "/tmp/"+System.currentTimeMillis()
        };
        SparkSubmit.main(arg0);

1. 这里要注意的是,这里使用的模式是cluster,而非client,也就是说driver程序也是运行在集群中的,而非提交的客户端,也就是我Win10本地。
2. 如果需要使用client提交,那么需要注意本地资源是否足够;同时因为这里使用的是cluster,所以需要确保集群资源同时可以运行一个driver以及executor(即,最少需要同时运行两个Container)
3. 其中的path,也就是打的jar包需要放到集群各个slave节点中的对应位置。比如lz集群中有node1,node2,node3 ,那么就需要把wc.jar放到这三个节点上,比如放到/tmp/wc.jar ,那么path的设置就要设置为file:/opt/wc.jar ,如果直接使用/opt/wc.jar 那么在进行参数解析的时候会被解析成file:/c:/opt/wc.jar (因为lz使用的是win10运行Tomcat),从而报jar包文件找不到的错误!

进入SparkSubmit.main源码,可以看到如下代码:

def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

代码里面是通过submit来提交任务的,顺着这条线往下,则最终是通过
Java Web提交任务到Spark Standalone集群并监控
mainMethod.invoke 是通过反射来调用的,通过debug可以得到,这里反射调用的其实是:RestSubmissionClient 的main函数提交任务的。
所以这里可以模仿RestSubmissionClient来提交任务。程序如下:

public static String submit(String appResource,String mainClass,String ...args){
        SparkConf sparkConf = new SparkConf();
        // 下面的是参考任务实时提交的Debug信息编写的
        sparkConf.setMaster(MASTER);
        sparkConf.setAppName(APPNAME+" "+ System.currentTimeMillis());
        sparkConf.set("spark.executor.cores","2");
        sparkConf.set("spark.submit.deployMode","cluster");
        sparkConf.set("spark.jars",appResource);
        sparkConf.set("spark.executor.memory","2G");
        sparkConf.set("spark.cores.max","2");
        sparkConf.set("spark.driver.supervise","false");
        Map<String,String> env = filterSystemEnvironment(System.getenv());
        CreateSubmissionResponse response = null;
        try {
            response = (CreateSubmissionResponse)
                    RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
        return response.submissionId();
    }

如果不加其中的

sparkConf.set

则程序运行会有问题,第一个错误就是:

java.lang.IllegalArgumentException: Invalid environment variable name: “=::”

这个错误是因为模式设置不对(没有设置cluster模式),所以在进行参数匹配的时候异常。可以看到的参数如下图所示:
Java Web提交任务到Spark Standalone集群并监控
这里面对应的参数,其实就是SparkSubmit提交任务所对应的值了。

4. 问题及问题解决

问题提出:
1. 最近一段时间,在想运行Spark的任务的时候为什么要提交到YARN上,而且通过实践发现,提交到YARN上程序运行比Spark Standalone运行要慢的多,所以是否能直接提交任务到Spark Standalone集群呢?
2. 提交任务到Spark Standalone集群后,如何获得任务的id,方便后面的监控呢?
3. 获得任务id后,怎么监控?

针对这三个问题,解答如下:
1. 第一个问题,应该是见仁见智的问题了,使用SparkONYARN的方式可以统一生态圈什么的;
2. 在上面的代码中已经可以提交任务,并且获取任务ID了。不过需要注意的是,通过:

response = (CreateSubmissionResponse)
RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));

获取的response需要转型为CreateSubmissionResponse,才能获得submittedId,但是要访问CreateSubmissionResponse,那么需要在某些包下面才行,所以lz的SparkEngine类才会定义在org.apache.spark.deploy.rest包中。

第三:
监控,监控就更简单了,可以参考:

private def requestStatus(args: SparkSubmitArguments): Unit = {
    new RestSubmissionClient(args.master)
      .requestSubmissionStatus(args.submissionToRequestStatusFor)
  }

这里就是监控的代码了,lz参考这段代码写了个监控,详见GitHub

后记

在提交任务到Spark Standalone的时候,lz发现driver和实际的任务是分开的,如下:
Java Web提交任务到Spark Standalone集群并监控
发现是driver 调用app,本来想着,driver是不是提交后,就Over了,结果发现driver会一直监控app的状态,如果app运行成功结束,那么driver状态就会返回FINISHED,如果失败,则driver状态也是ERROR。所以可以直接监控driver来监控整个任务。

使用Spark Standalone来运行Spark程序,确实比Spark On YARN快的多了!


分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990