windows远程提交Spark作业到linux集群,并尝试多种模式运行
本地构建java+scala混合工程:
- 创建maven项目,选择scala-archetype-simpleprojectstructure(也可以空maven项目自己配置)。
- 注意:更改默认生成的scala-version,以及-target:jvm的版本
- 添加scala SDK,注意与Spark对应的版本
- 添加Spark的依赖包(针对需要使用Spark相关包的module导入,也可以用Maven导,我习惯了用本地Spark的jars)
- src/main下手动(依稀记得使用SBT构建的话,不需要手动创建目录)创建java目录并指定为source,test目录同理,创建resources目录【我这里貌似由于模块间的依赖关系,不允许同一module下建2个source,于是我便把java、scala代码放一起了,用包分开】
暂时的依赖关系如下:
测试使用本地win10做driver
代码如下:【仅为测试功能,最简单的WordCount】
PS:注释掉前几行是因为我想把提交运行的代码与计算逻辑的代码分开。
object RemoteWordCount {
def main(args: Array[String]) {
//Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
//Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
//val input = "hdfs://host0.com:8020/user/attop/test_data/movies.dat"
//val output = "file:///E:/tmp20180502"
//val master = "spark://host1.com:7077"
//val conf = new SparkConf().setAppName("WordCountScala").setMaster(master)
val conf = new SparkConf().setAppName("WordCountRemote")
val sc = new SparkContext(conf)
val lines = sc.textFile(args(0))
//lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
val kvWordRDD = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
kvWordRDD.foreach(word => println(word._1 + "出现了" + word._2 + "次。"))
// 排序之后,默认为ascending升序(排序完成再把key-value的位置“恢复”)
val afterSort = kvWordRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
afterSort.foreach(word => println(word._1 + "出现了" + word._2 + "次。"))
afterSort.saveAsTextFile(args(1))
sc.stop()
}
}
~打包:多模块打包管理推荐使用“maven-assembly-plugin” ~
也可参考
注意:pom文件中build的路径,尽量缩小范围,不相干的就别加进去了。避免打包时受其他文件的影响
以前编译scala并打包都是用的SBT,这次在maven-assembly-plugin中行不通了
【主要是我有点“较真”,我希望scala文件与java文件不混合打包,如果混合的话,还打包,是很方便的,但是在scala的代码中,我只想“专注处理数据”(不既处理数据,又提交jar到集群运行),并且单独打包,然后再使用Java或者Scala调用SparkSubmit提交到集群】
猜测该插件只编译java代码,而不编译scala代码
解决方法:maven-scala-plugin【但是这个库7年不更新了。。。几乎没人用】
<!-- scala打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
补充:好慢的!!!比SBT慢太多了!!!
结果如下:
java远程提交代码到集群
sparkContext版本
1.代码
public class RemoteSparkJava {
public static void main(String[] args) {
String fileName = "wordcount";
String[] arg0 = new String[]{
"--master", "spark://172.21.176.51:7078",
"--deploy-mode", "client",
"--class", "lab.ipl.scalacode.RemoteWordCount",
"--executor-memory", "1000m",
"--name", "use which name?",
// "--jar",tmp + "lib/spark_filter.jar",
// 指定jar包
"D:/JavaStudy/LabIpl/ipbdLab/ipl-bigdata/target/ipl-bigdata.jar",
"hdfs://host0.com:8020/user/attop/test_data/movies.dat",
"file:///E:/tmp20180502_" + fileName
};
SparkSubmit.main(arg0);
}
}
2.报错
java.lang.NoClassDefFoundError: org/apache/spark/deploy/SparkSubmit
原因是我在Maven中引入Spark依赖后,就删掉了之前导入的jars,加上即可。
3.报错:
不能连接到7077
- start-slave.sh spark://host_name:7077
- 首先我尝试手动重启spark:./start-master.sh -h 172.21.176.51
-
发觉还是连接不上,于是查看日志,注意到master尝试web UI 连接8080失败:【按道理应该是 8080 for master, 8081 for worker】
- WARN Utils: Service ‘MasterUI’ could not bind on port 8080. Attempting port 8081
到8081页面后发觉master启动在7078
-
修改后再次运行,警告:
- TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
初始化job时没有获取到任何资源;提示检查集群,确保workers可以被注册并有足够的内存资源
- TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
首先,我确保scala代码是运行了的:
(这里的第一个警告: Skip remote jar,没找到合理的解释,TODO)
任务确实是能成功提交的:
- jps找不见worker,查阅livy提交作业估计企业中也不咋用,这篇文章,留着实习了,再更新吧
sparkSession版本
scala远程提交代码到集群
sparkContext版本
sparkSession版本
上一篇: SICP练习1.12生成帕斯卡三角形