自定义累加器实现wordcount
程序员文章站
2022-06-14 13:38:07
...
第一种写法:
object Scala3_ACC {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("acc")
val sc = new SparkContext(sparkConf)
val rdd= sc.makeRDD(List("spark hadoop", "scala", "java hello scala"))
//1. 创建累加器
val acc = new WordCountAccumulator
//2. 注册累加器
sc.register(acc)
//3. 调用累加器
rdd.flatMap(_.split(" ")).foreach(
word => acc.add(word)
)
//4. 获取累加器的值
println(acc.value)
sc.stop()
}
//自定义累加器
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private var wordCountMap: mutable.Map[String, Int] = mutable.Map[String, Int]()
//方法1:判断当前的累加器是否初始化
override def isZero: Boolean = {
wordCountMap.isEmpty
}
//方法2:复制一个累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new WordCountAccumulator
}
//方法3:重置累加器
override def reset(): Unit = {
wordCountMap.clear()
}
//-**方法4:向累加器中增加值**(主要的方法之一)
override def add(word: String): Unit = {
wordCountMap(word) = wordCountMap.getOrElse(word, 0) + 1
}
//**方法5:合并当前累加器和其他累加器,两两合并,此方法由Driver端调用,合并由executor返回的多个累加器**主要的方法之一
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1 = wordCountMap
val map2 = other.value
wordCountMap = map1.foldLeft(map2)((map, kv) => {
map(kv._1) = map.getOrElse(kv._1, 0) + kv._2
map
}
)
}
//方法6:返回当前累加器的值
override def value: mutable.Map[String, Int] = {
wordCountMap
}
}
}
第二种写法:
package com.atguigu.spark.day04
import com.atguigu.spark.day03.WCAcc
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import org.junit.{After, Before, Test}
import scala.collection.mutable
class acc1 {
// 标识自定义的类也使用kryo进行序列化
val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app"))
@Before
def start(){
//删除output目录
val fileSystem: FileSystem = FileSystem.get(new Configuration())
val path = new Path("output")
if (fileSystem.exists(path)){
fileSystem.delete(path,true)
}
}
@After
def stop(){
sparkContext.stop()
}
@Test
def wordcount()={
val rdd:RDD[String]=sparkContext.textFile("input/a.txt")
//单词
val rdd1: RDD[String] = rdd.flatMap(line => line.split(" "))
val acc = new WCAcc
sparkContext.register(acc,"wc")
rdd1.foreach(word => acc.add(word))
println(acc.value)
}
}
class WCAcc extends AccumulatorV2[String,mutable.Map[String,Int]] {
//提供属性累加单词
private val result:mutable.Map[String,Int]=mutable.Map[String,Int]()
//判断是否是初始值状态
override def isZero: Boolean = result.isEmpty
//复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc
//重置累加器
override def reset(): Unit = result.clear()
//累加
override def add(v: String): Unit = {
result.put(v,result.getOrElse(v,0)+1)
}
//将other的数据,合并到当前累加器上
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val toMergeMap:mutable.Map[String,Int]=other.value
for ((key,value) <- toMergeMap){
result.put(key,result.getOrElse(key,0)+value)
}
}
override def value: mutable.Map[String, Int] = result
}
a.txt文件:
hi hi hi hi
hello hello hello
nice nice nice
其中最主要的两个方法是add(累加)和merge(合并累加器),代码需要复习!
自定义累加器wordcount详细版
package com.atguigu.spark.day04
import com.atguigu.spark.day03.WCAcc
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import org.junit.{After, Before, Test}
import scala.collection.mutable
class acc1 {
// 标识自定义的类也使用kryo进行序列化
val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app"))
@Before
def start(){
//删除output目录
val fileSystem: FileSystem = FileSystem.get(new Configuration())
val path = new Path("output")
if (fileSystem.exists(path)){
fileSystem.delete(path,true)
}
}
@After
def stop(){
sparkContext.stop()
}
@Test
def wordcount()={
val rdd:RDD[String]=sparkContext.textFile("input/a.txt")
//单词
val rdd1: RDD[String] = rdd.flatMap(line => line.split(" "))
val acc = new WCAcc
sparkContext.register(acc,"wc")
rdd1.foreach(word => acc.add(word))
println(acc.value)
}
@Test
def systemAcc()={
val rdd = sparkContext.makeRDD(List(1,2,3,4,5))
//声明累加器
var sum = sparkContext.longAccumulator("sum")
rdd.foreach(
num => {
sum.add(num)
}
)
println("sum="+sum.value)
}
}
class WCAcc extends AccumulatorV2[String,mutable.Map[String,Int]] {
//提供属性累加单词
private val result:mutable.Map[String,Int]=mutable.Map[String,Int]()
override def isZero: Boolean = result.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc
override def reset(): Unit = result.clear()
override def add(v: String): Unit = {
result.put(v,result.getOrElse(v,0)+1)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val toMergeMap:mutable.Map[String,Int]=other.value
for ((key,value) <- toMergeMap){
result.put(key,result.getOrElse(key,0)+value)
}
}
override def value: mutable.Map[String, Int] = result
}
//自定义累加器
//1.继承AccumulatorV2,并设定泛型
//2.重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
var map:mutable.Map[String,Long]= mutable.Map()
//累加器是否为初始状态
override def isZero: Boolean = map.isEmpty
//复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator
//重置累加器
override def reset(): Unit = map.clear()
//向累加器中增加数据
override def add(word: String): Unit = {
//查询map中是否存在相同的单词
//如果有相同的单词,那么单词的数量加1
//如果没有相同的单词,那么在map中增加这个单词
map(word)=map.getOrElse(word,0L)+1L
//?这里可以map后面小括号加变量名
}
//合并累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = map
val map2 = other.value
//两个Map的合并
map = map1.foldLeft(map2)(
//这里的代码看不懂,用了柯里化,里面的参数都表示了什么
(innerMap,kv) =>{
innerMap(kv._1) = innerMap.getOrElse(kv._1,0L)+kv._2
innerMap
}
)
}
//返回累加器的结果
override def value: mutable.Map[String, Long] = map
}
上一篇: php循环请求多个url
下一篇: Bootstrap显示与隐藏简单实现代码