欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark中reduceByKey、groupByKey、combineByKey的区别

程序员文章站 2022-07-15 17:13:55
...

reduceByKey

reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义;
spark中reduceByKey、groupByKey、combineByKey的区别

groupByKey

groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先生成RDD,然后才能对此RDD通过map进行自定义函数操作
spark中reduceByKey、groupByKey、combineByKey的区别
比较发现,使用groupByKey时,不会进行局部merge,会导致集群节点之间的开销很大,导致传输延时。

combineByKey

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Test{
	def main(args: Array[String]): Unit = {
		Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
		Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
		Logger.getLogger("org.spark-project").setLevel(Level.WARN)
		val conf = new SparkConf()
				.setAppName(s"${Test.getClass.getSimpleName}")
				.setMaster("local[*]")
		val sc = new SparkContext(conf)
		val lines = sc.textFile("file:///D:/data/people.txt")
		val man:LongAccumulator = lines.sparkContext.longAccumulator("man")
		val woman:LongAccumulator = lines.sparkContext.longAccumulator("woman")
		val peoMapRDD:RDD[(String,String)] = lines.map(line => {
			val files = line.split("\\s+")
			(files(1),files(2))
		})
		peopleOps(sc,peoMapRDD)
		sc.stop()
	}

	def peopleOps(sc: SparkContext,peoMapRDD:RDD[(String,String)]):Unit = {
		def createCombiner(info:String):mutable.TreeSet[String] = {
			val ple = mutable.TreeSet[String]()
			ple.add(info)
			ple
		}
		def mergeValue(ple:mutable.TreeSet[String],info:String):mutable.TreeSet[String] = {
			ple.add(info)
			ple
		}
		def mergeCombiners(ple1:mutable.TreeSet[String],ple2:mutable.TreeSet[String]):mutable.TreeSet[String] = {
			val ple = mutable.TreeSet[String]()(new Ordering[String](){
				override def compare(x: String, y: String): Int = {
					y.compareTo(x)
				}
			})
			for(info <- ple1.++:(ple2)){
				ple.add(info)
			}
			ple
		}
		val ret = peoMapRDD.combineByKey(createCombiner,mergeValue,mergeCombiners)
		ret.foreach(println)
	}

}

people.txt数据:
1 F 170
2 M 178
3 M 174
4 F 165
spark中reduceByKey、groupByKey、combineByKey的区别

上一篇: Spark源码解析:RDD

下一篇: JAVA接口