基于spark开发wordcount案例
程序员文章站
2022-07-14 13:58:10
...
spark的WordCount
原理:
数据流分析:
textFile(“in”):读取本地文件in文件夹数据;
flatMap(.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
map((,1)):对每一个元素操作,将单词映射为元组;
reduceByKey(+):按照key将值进行聚合,相加;
collect:将数据收集到Driver端展示。
package day0904
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//获取配置信息 setMaster设置本地模式 setAppName设置应用程序名
val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
//获取SparkContext上下文对象
val sc = new SparkContext(config)
//从in(可以是目录,可以是文件)中读取文件中的每一行,跟Hadoop取一行
//这块不指定分区数,针对文件来说默认至少两个分区
val lines: RDD[String] = sc.textFile("in")
//转换结构1: 每行的内容以空格作为分割符
val words = lines.flatMap(_.split(" "))
//转换结构2: (oop,1) (spark,1) (mnl,1).... 二元组,key是单词,value是1
val wordToOne: RDD[(String, Int)] = words.map((_, 1))
//转换结构3: 将上述map阶段处理完的数据做聚合操作,相同key做value求和
val wordTosum = wordToOne.reduceByKey(_ + _)
//收集阶段
val result: Array[(String, Int)] = wordTosum.collect()
//控制台打印输出
result.foreach(println)
//结果保存至文件中
wordTosum.saveAsTextFile("outputword")
}
}
运行结果:
部署到yarn集群上:
这里根据电脑选择合适的路径sc.textFile("/input")这是我集群的文件,不过我试过保存至hadoop文件系统乱码,所以 wordTosum.saveAsTextFile(“outputword”)代码我没有加入,打包jar。运行命令
spark-submit --class day0904.WordCount --master yarn --deploy-mode client Spark-1.0-SNAPSHOT-jar-with-dependencies.jar
–class 指定需要运行的Main方法所在类, --master 指定部署模式 yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行,而计算任务在cluster上。还有其他参数我这里并没有加入,跟jar包路径
效果如下:
总结:scala在执行collect()函数(收集)之前不会对数据以及文件做任何操作,这是scala懒加载机制,也是RDD两类算子的区别.
推荐阅读
-
基于spark开发wordcount案例
-
Spark API编程动手实战-08-基于IDEA使用Spark API开发Spark程序-01
-
将java开发的wordcount程序部署到spark集群上运行
-
CDH 开发运行Spark wordcount程序
-
基于Jupyter notebook搭建Spark集群开发环境的详细过程
-
使用Eclipse基于Maven使用Java开发WordCount程序项目
-
Spark开发-WordCount详细讲解
-
spark中使用不同算子实现wordcount的案例
-
基于ARM Cortex-A9四核CPU的exynos4412开发板GPIO编程案例
-
基于以太坊的58同城 | DApp开发与应用案例