欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

自定义累加器实现wordcount

程序员文章站 2022-06-14 13:38:07
...

第一种写法:

object Scala3_ACC {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("acc")
    val sc = new SparkContext(sparkConf)

    val rdd= sc.makeRDD(List("spark hadoop", "scala", "java hello scala"))

    //1. 创建累加器
    val acc = new WordCountAccumulator

    //2. 注册累加器
    sc.register(acc)

    //3. 调用累加器
    rdd.flatMap(_.split(" ")).foreach(
      word => acc.add(word)
    )

    //4. 获取累加器的值
    println(acc.value)

    sc.stop()
    
  }
  //自定义累加器
  class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
    private var wordCountMap: mutable.Map[String, Int] = mutable.Map[String, Int]()
    //方法1:判断当前的累加器是否初始化
    override def isZero: Boolean = {
      wordCountMap.isEmpty
    }
    //方法2:复制一个累加器
    override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
      new WordCountAccumulator
    }
    //方法3:重置累加器
    override def reset(): Unit = {
      wordCountMap.clear()
    }
   //-**方法4:向累加器中增加值**(主要的方法之一)
    override def add(word: String): Unit = {
      wordCountMap(word) = wordCountMap.getOrElse(word, 0) + 1

    }
    //**方法5:合并当前累加器和其他累加器,两两合并,此方法由Driver端调用,合并由executor返回的多个累加器**主要的方法之一
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
      val map1 = wordCountMap
      val map2 = other.value

      wordCountMap = map1.foldLeft(map2)((map, kv) => {
        map(kv._1) = map.getOrElse(kv._1, 0) + kv._2
        map
      }
      )

    }
   //方法6:返回当前累加器的值
    override def value: mutable.Map[String, Int] = {
      wordCountMap
    }
  }

}

第二种写法:

package com.atguigu.spark.day04

import com.atguigu.spark.day03.WCAcc
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import org.junit.{After, Before, Test}

import scala.collection.mutable

class acc1 {

  // 标识自定义的类也使用kryo进行序列化
  val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app"))

  @Before
  def start(){

    //删除output目录
    val fileSystem: FileSystem = FileSystem.get(new Configuration())

    val path = new Path("output")

    if (fileSystem.exists(path)){
      fileSystem.delete(path,true)
    }

  }

  @After
  def stop(){
    sparkContext.stop()
  }

  @Test
  def wordcount()={
    val rdd:RDD[String]=sparkContext.textFile("input/a.txt")

    //单词
    val rdd1: RDD[String] = rdd.flatMap(line => line.split(" "))

    val acc = new WCAcc

    sparkContext.register(acc,"wc")

    rdd1.foreach(word => acc.add(word))

    println(acc.value)
  }
}

class WCAcc extends AccumulatorV2[String,mutable.Map[String,Int]] {

  //提供属性累加单词
  private val result:mutable.Map[String,Int]=mutable.Map[String,Int]()
  //判断是否是初始值状态
  override def isZero: Boolean = result.isEmpty
  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc
  //重置累加器
  override def reset(): Unit = result.clear()
  //累加
  override def add(v: String): Unit = {
    result.put(v,result.getOrElse(v,0)+1)
  }

  //将other的数据,合并到当前累加器上
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {

    val toMergeMap:mutable.Map[String,Int]=other.value

    for ((key,value) <- toMergeMap){
      result.put(key,result.getOrElse(key,0)+value)
    }
  }

  override def value: mutable.Map[String, Int] = result
}
a.txt文件:
hi hi hi hi
hello hello hello
nice nice nice

其中最主要的两个方法是add(累加)和merge(合并累加器),代码需要复习!
自定义累加器wordcount详细版

package com.atguigu.spark.day04

import com.atguigu.spark.day03.WCAcc
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import org.junit.{After, Before, Test}

import scala.collection.mutable

class acc1 {

  // 标识自定义的类也使用kryo进行序列化
  val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app"))

  @Before
  def start(){

    //删除output目录
    val fileSystem: FileSystem = FileSystem.get(new Configuration())

    val path = new Path("output")

    if (fileSystem.exists(path)){
      fileSystem.delete(path,true)
    }

  }

  @After
  def stop(){
    sparkContext.stop()
  }

  @Test
  def wordcount()={
    val rdd:RDD[String]=sparkContext.textFile("input/a.txt")

    //单词
    val rdd1: RDD[String] = rdd.flatMap(line => line.split(" "))

    val acc = new WCAcc

    sparkContext.register(acc,"wc")

    rdd1.foreach(word => acc.add(word))

    println(acc.value)
  }

  @Test
  def systemAcc()={
    val rdd = sparkContext.makeRDD(List(1,2,3,4,5))
    //声明累加器
    var sum = sparkContext.longAccumulator("sum")
    rdd.foreach(
      num => {
        sum.add(num)
      }
    )
    println("sum="+sum.value)
  }
}

class WCAcc extends AccumulatorV2[String,mutable.Map[String,Int]] {

  //提供属性累加单词
  private val result:mutable.Map[String,Int]=mutable.Map[String,Int]()

  override def isZero: Boolean = result.isEmpty

  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc

  override def reset(): Unit = result.clear()

  override def add(v: String): Unit = {
    result.put(v,result.getOrElse(v,0)+1)
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {

    val toMergeMap:mutable.Map[String,Int]=other.value

    for ((key,value) <- toMergeMap){
      result.put(key,result.getOrElse(key,0)+value)
    }
  }

  override def value: mutable.Map[String, Int] = result
}

//自定义累加器
//1.继承AccumulatorV2,并设定泛型
//2.重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{

  var map:mutable.Map[String,Long]= mutable.Map()

  //累加器是否为初始状态
  override def isZero: Boolean = map.isEmpty

  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator

  //重置累加器
  override def reset(): Unit = map.clear()

  //向累加器中增加数据
  override def add(word: String): Unit = {
    //查询map中是否存在相同的单词
    //如果有相同的单词,那么单词的数量加1
    //如果没有相同的单词,那么在map中增加这个单词
    map(word)=map.getOrElse(word,0L)+1L
    //?这里可以map后面小括号加变量名
  }

  //合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map1 = map
    val map2 = other.value

    //两个Map的合并
    map = map1.foldLeft(map2)(
      //这里的代码看不懂,用了柯里化,里面的参数都表示了什么
      (innerMap,kv) =>{
        innerMap(kv._1) = innerMap.getOrElse(kv._1,0L)+kv._2
        innerMap
      }
    )
  }

  //返回累加器的结果
  override def value: mutable.Map[String, Long] = map
}
相关标签: spark