spark中reduceByKey、groupByKey、combineByKey的区别
程序员文章站
2022-07-15 17:13:55
...
reduceByKey
reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义;
groupByKey
groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先生成RDD,然后才能对此RDD通过map进行自定义函数操作
比较发现,使用groupByKey时,不会进行局部merge,会导致集群节点之间的开销很大,导致传输延时。
combineByKey
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Test{
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)
val conf = new SparkConf()
.setAppName(s"${Test.getClass.getSimpleName}")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:///D:/data/people.txt")
val man:LongAccumulator = lines.sparkContext.longAccumulator("man")
val woman:LongAccumulator = lines.sparkContext.longAccumulator("woman")
val peoMapRDD:RDD[(String,String)] = lines.map(line => {
val files = line.split("\\s+")
(files(1),files(2))
})
peopleOps(sc,peoMapRDD)
sc.stop()
}
def peopleOps(sc: SparkContext,peoMapRDD:RDD[(String,String)]):Unit = {
def createCombiner(info:String):mutable.TreeSet[String] = {
val ple = mutable.TreeSet[String]()
ple.add(info)
ple
}
def mergeValue(ple:mutable.TreeSet[String],info:String):mutable.TreeSet[String] = {
ple.add(info)
ple
}
def mergeCombiners(ple1:mutable.TreeSet[String],ple2:mutable.TreeSet[String]):mutable.TreeSet[String] = {
val ple = mutable.TreeSet[String]()(new Ordering[String](){
override def compare(x: String, y: String): Int = {
y.compareTo(x)
}
})
for(info <- ple1.++:(ple2)){
ple.add(info)
}
ple
}
val ret = peoMapRDD.combineByKey(createCombiner,mergeValue,mergeCombiners)
ret.foreach(println)
}
}
people.txt数据:
1 F 170
2 M 178
3 M 174
4 F 165
上一篇: Spark源码解析:RDD
下一篇: JAVA接口