Spark_使用累加器实现WordCount避免shuffle
package com.atguigu.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object TestACCWc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(“local[7]”).setAppName(“MyWordCount”)
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List(“hello”,“saprk”,“hello”))
//自定义累加器
//1.创建累加器对象
val wcAcc = new MyAccumulator
//2.向spark进行注册
sc.register(wcAcc,“wordCountAcc”)
//3.使用累加器
rdd.foreach(
word=>{
wcAcc.add(word)
}
)
println(wcAcc.value)
}
//继承AccumulatorV2定义泛型
//IN:累加器输入类型
//OUT:累加器返回的数据类型
class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
private var wcMap=mutable.MapString,Long
override def isZero: Boolean = wcMap.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()
override def reset(): Unit = {wcMap.clear()}
override def add(word: String): Unit = {
val newCnt: Long = wcMap.getOrElse(word,0L)+1
wcMap.update(word,newCnt)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1: mutable.Map[String, Long] = this.wcMap
val map2: mutable.Map[String, Long] = other.value
//两个map合并
map2.foreach{
case(word,count)=>{
val newCount=map1.getOrElse(word,0L)+count
map1.update(word,newCount)
}
}
}
override def value: mutable.Map[String, Long] = {
wcMap
}
}
}