Java Web提交任务到Spark Standalone集群并监控
Java Web提交任务到Spark Standalone集群并监控
1. 环境
软件 | 版本 | 备注 |
---|---|---|
IDEA | 14.1.5 | |
JDK | 1.8 | |
Spark | 1.6.0 | 工程maven引用 |
Spark | cdh5.7.3-spark1.6.0 | 实际集群5.7.3-1.cdh5.7.3.p0.5 |
Hadoop | 2.6.4 | 工程Maven引用 |
Hadoop | 2.6.0-cdh5.7.3 | 实际集群参数 |
Maven | 3.3 |
2. 工程下载路径
工程在GitHub上地址为: javaweb_spark_standalone_monitor
3. Spark任务提交流程
之前做过相关的工作,知道可以通过下面的方式来提交任务到Spark Standalone集群:
String[] arg0=new String[]{
"--master","spark://server2.tipdm.com:6066",
"--deploy-mode","cluster",
"--name",appName,
"--class",className,
"--executor-memory","2G",
"--total-executor-cores","10",
"--executor-cores","2",
path,
"/user/root/a.txt",
"/tmp/"+System.currentTimeMillis()
};
SparkSubmit.main(arg0);
1. 这里要注意的是,这里使用的模式是cluster,而非client,也就是说driver程序也是运行在集群中的,而非提交的客户端,也就是我Win10本地。
2. 如果需要使用client提交,那么需要注意本地资源是否足够;同时因为这里使用的是cluster,所以需要确保集群资源同时可以运行一个driver以及executor(即,最少需要同时运行两个Container)
3. 其中的path,也就是打的jar包需要放到集群各个slave节点中的对应位置。比如lz集群中有node1,node2,node3 ,那么就需要把wc.jar放到这三个节点上,比如放到/tmp/wc.jar ,那么path的设置就要设置为file:/opt/wc.jar ,如果直接使用/opt/wc.jar 那么在进行参数解析的时候会被解析成file:/c:/opt/wc.jar (因为lz使用的是win10运行Tomcat),从而报jar包文件找不到的错误!
进入SparkSubmit.main源码,可以看到如下代码:
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
代码里面是通过submit来提交任务的,顺着这条线往下,则最终是通过
mainMethod.invoke 是通过反射来调用的,通过debug可以得到,这里反射调用的其实是:RestSubmissionClient 的main函数提交任务的。
所以这里可以模仿RestSubmissionClient来提交任务。程序如下:
public static String submit(String appResource,String mainClass,String ...args){
SparkConf sparkConf = new SparkConf();
// 下面的是参考任务实时提交的Debug信息编写的
sparkConf.setMaster(MASTER);
sparkConf.setAppName(APPNAME+" "+ System.currentTimeMillis());
sparkConf.set("spark.executor.cores","2");
sparkConf.set("spark.submit.deployMode","cluster");
sparkConf.set("spark.jars",appResource);
sparkConf.set("spark.executor.memory","2G");
sparkConf.set("spark.cores.max","2");
sparkConf.set("spark.driver.supervise","false");
Map<String,String> env = filterSystemEnvironment(System.getenv());
CreateSubmissionResponse response = null;
try {
response = (CreateSubmissionResponse)
RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));
}catch (Exception e){
e.printStackTrace();
return null;
}
return response.submissionId();
}
如果不加其中的
sparkConf.set
…
则程序运行会有问题,第一个错误就是:
java.lang.IllegalArgumentException: Invalid environment variable name: “=::”
这个错误是因为模式设置不对(没有设置cluster模式),所以在进行参数匹配的时候异常。可以看到的参数如下图所示:
这里面对应的参数,其实就是SparkSubmit提交任务所对应的值了。
4. 问题及问题解决
问题提出:
1. 最近一段时间,在想运行Spark的任务的时候为什么要提交到YARN上,而且通过实践发现,提交到YARN上程序运行比Spark Standalone运行要慢的多,所以是否能直接提交任务到Spark Standalone集群呢?
2. 提交任务到Spark Standalone集群后,如何获得任务的id,方便后面的监控呢?
3. 获得任务id后,怎么监控?
针对这三个问题,解答如下:
1. 第一个问题,应该是见仁见智的问题了,使用SparkONYARN的方式可以统一生态圈什么的;
2. 在上面的代码中已经可以提交任务,并且获取任务ID了。不过需要注意的是,通过:
response = (CreateSubmissionResponse)
RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));
获取的response需要转型为CreateSubmissionResponse,才能获得submittedId,但是要访问CreateSubmissionResponse,那么需要在某些包下面才行,所以lz的SparkEngine类才会定义在org.apache.spark.deploy.rest包中。
第三:
监控,监控就更简单了,可以参考:
private def requestStatus(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
}
这里就是监控的代码了,lz参考这段代码写了个监控,详见GitHub。
后记
在提交任务到Spark Standalone的时候,lz发现driver和实际的任务是分开的,如下:
发现是driver 调用app,本来想着,driver是不是提交后,就Over了,结果发现driver会一直监控app的状态,如果app运行成功结束,那么driver状态就会返回FINISHED,如果失败,则driver状态也是ERROR。所以可以直接监控driver来监控整个任务。
使用Spark Standalone来运行Spark程序,确实比Spark On YARN快的多了!
分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990