spark中使用不同算子实现wordcount的案例
程序员文章站
2022-04-08 16:07:25
...
数据准备
/**
* @author xianwei
* @Created 2020/06/05 22:06
*/
object WordCountExer {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("file - RDD")
val sc = new SparkContext(sparkConf)
//数据准备
val list = List("hello", "world", "hello", "scala", "hello", "spark", "spark")
val rdd3: RDD[String] = sc.makeRDD(list, 2)
//TODO 处理逻辑抽取出来放到下面单独说明
sc.stop()
}
}
groupBy
//1.groupBy
val res2: RDD[(String, Int)] = rdd3.groupBy(str => str).map(kv => (kv._1, kv._2.size))
println("1:" + res2.collect().mkString(","))
groupByKey
//2.groupByKey
val res3: RDD[(String, Int)] = rdd3.map((_, 1)).groupByKey().map(kv => (kv._1, kv._2.sum))
println("2:" + res3.collect().mkString(","))
recduceByKey
//3.recduceByKey
val res4: RDD[(String, Int)] = rdd3.map((_, 1)).reduceByKey(_ + _)
println("3:" + res4.collect().mkString(","))
aggregateByKey
//4.aggregateByKey
val res5: RDD[(String, Int)] = rdd3.map((_, 1)).aggregateByKey(0)(_ + _, _ + _)
println("4:" + res5.collect().mkString(","))
foldByKey
//5.foldByKey
val res6: RDD[(String, Int)] = rdd3.map((_, 1)).foldByKey(0)(_ + _)
println("5:" + res6.collect().mkString(","))
combineByKey
//6.combineByKey
val res7: RDD[(String, Int)] = rdd3.map((_, 1)).combineByKey(v => v, (v1: Int, v2) => v1 + v2, (v1: Int, v2: Int) => v1 + v2)
println("6:" + res7.collect().mkString(","))
countByKey
//7.countByKey
val res8: collection.Map[String, Long] = rdd3.countByValue()
println("7:" + res8.mkString(","))
cogroup
//8.cogroup
val rdd4: RDD[(String, Int)] = sc.makeRDD(list).map((_, 1))
val rdd5: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd3.map((_, 1)).cogroup(rdd4)
val res9: RDD[(String, Int)] = rdd5.map(kv => (kv._1, kv._2._1.size))
println("8:" + res9.collect().mkString(","))
aggregate
//9.aggregate
val map: mutable.Map[String, Int] = mutable.Map()
val res10: mutable.Map[String, Int] = rdd3.aggregate(map)(
(map, word) => {
map(word) = map.getOrElse(word, 0) + 1
map
},
(map1, map2) => {
map1.foldLeft(map2)(
(m, kv) => {
m(kv._1) = m.getOrElse(kv._1, 0) + kv._2
m
}
)
}
)
println("9:" + res10.mkString(","))
fold
//10.fold
val mapZero: mutable.Map[String, Int] = mutable.Map()
val res11: mutable.Map[String, Int] = rdd3.map(word => mutable.Map(word -> 1)).fold(mapZero)((m1: mutable.Map[String, Int], m2: mutable.Map[String, Int]) => {
m2.foreach(kv=>{
m1(kv._1) = m1.getOrElse(kv._1,0) + kv._2
})
m1
})
println("10:" + res11.mkString(","))
打印结果
1:(scala,1),(hello,3),(world,1),(spark,2)
2:(scala,1),(hello,3),(world,1),(spark,2)
3:(scala,1),(hello,3),(world,1),(spark,2)
4:(scala,1),(hello,3),(world,1),(spark,2)
5:(scala,1),(hello,3),(world,1),(spark,2)
6:(scala,1),(hello,3),(world,1),(spark,2)
7:scala -> 1,hello -> 3,world -> 1,spark -> 2
8:(spark,2),(scala,1),(world,1),(hello,3)
9:spark -> 2,scala -> 1,world -> 1,hello -> 3
10:spark -> 2,scala -> 1,world -> 1,hello -> 3
上一篇: 泛型的学习应用
下一篇: 怎样在不同线程间实现对文件的同步操作