欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

累加器实现wordcount

程序员文章站 2022-06-14 13:37:25
...
package util
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable

class MyAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Int]] {
  private val _hashAcc = new mutable.HashMap[String,Int]()
  override def isZero: Boolean = _hashAcc.isEmpty
  override def copy():  AccumulatorV2[String, mutable.HashMap[String,Int]] = {
    val newAcc = new MyAccumulator()
    newAcc._hashAcc  ++= (_hashAcc)
    newAcc
  }
  override def reset(): Unit = _hashAcc.clear()

  override def add(v: String): Unit = {
    _hashAcc.get(v) match{
      case None => _hashAcc +=((v,1))
      case Some(a) => _hashAcc +=((v,a+1))
    }
  }
  //分区求和
  override def merge(other: AccumulatorV2[String, mutable.HashMap[String,Int]]): Unit={
    other match{
      case o:AccumulatorV2[String, mutable.HashMap[String,Int]] =>{
        for((k,v) <- o.value){
          _hashAcc.get(k) match {
            case None => _hashAcc +=((k,v))
            case Some(a) => _hashAcc += ((k,v+a))
          }
        }
      }
    }
  }
  override def value:  mutable.HashMap[String,Int] = {
    _hashAcc
  }
}
object WordCount{
  def main(args :Array[String]):Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("demo")
    val sc = new SparkContext(conf)
    val line = sc.parallelize(List("a","b","c","d","e","f","a","a","b","c"))
    val acc = new MyAccumulator()
    sc.register(acc,"test")
    line.foreach(acc.add(_))
    for((k,v) <- acc.value.toList.sortBy(_._2).reverse){
      println(k+","+v)
    }
    sc.stop()
  }


}