累加器实现wordcount
程序员文章站
2022-06-14 13:37:25
...
package util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
class MyAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Int]] {
private val _hashAcc = new mutable.HashMap[String,Int]()
override def isZero: Boolean = _hashAcc.isEmpty
override def copy(): AccumulatorV2[String, mutable.HashMap[String,Int]] = {
val newAcc = new MyAccumulator()
newAcc._hashAcc ++= (_hashAcc)
newAcc
}
override def reset(): Unit = _hashAcc.clear()
override def add(v: String): Unit = {
_hashAcc.get(v) match{
case None => _hashAcc +=((v,1))
case Some(a) => _hashAcc +=((v,a+1))
}
}
//分区求和
override def merge(other: AccumulatorV2[String, mutable.HashMap[String,Int]]): Unit={
other match{
case o:AccumulatorV2[String, mutable.HashMap[String,Int]] =>{
for((k,v) <- o.value){
_hashAcc.get(k) match {
case None => _hashAcc +=((k,v))
case Some(a) => _hashAcc += ((k,v+a))
}
}
}
}
}
override def value: mutable.HashMap[String,Int] = {
_hashAcc
}
}
object WordCount{
def main(args :Array[String]):Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("demo")
val sc = new SparkContext(conf)
val line = sc.parallelize(List("a","b","c","d","e","f","a","a","b","c"))
val acc = new MyAccumulator()
sc.register(acc,"test")
line.foreach(acc.add(_))
for((k,v) <- acc.value.toList.sortBy(_._2).reverse){
println(k+","+v)
}
sc.stop()
}
}