Spark操作之aggregate、aggregateByKey详解
1. aggregate函数
将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zerovalue)进行combine操作。这个函数最终返回的类型不需要和rdd中元素类型一致。
seqop操作会聚合各分区中的元素,然后combop操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zerovalue. seqop的操作是遍历分区中的所有元素(t),第一个t跟zerovalue做操作,结果再作为与第二个t做操作的zerovalue,直到遍历完整个分区。combop操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟rdd不同类型的值。因此,需要一个操作seqop来把分区中的元素t合并成一个u,另外一个操作combop把所有u聚合。
例子程序:
scala> val rdd = list(1,2,3,4,5,6,7,8,9) rdd: list[int] = list(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> rdd.par.aggregate((0,0))( (acc,number) => (acc._1 + number, acc._2 + 1), (par1,par2) => (par1._1 + par2._1, par1._2 + par2._2) ) res0: (int, int) = (45,9) scala> res0._1 / res0._2 res1: int = 5
过程大概这样:
首先,初始值是(0,0),这个值在后面2步会用到。
然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的t,这里即是list中的元素。所以acc._1 + number,acc._2 + 1的过程如下。
1. 0+1, 0+1
2. 1+2, 1+1
3. 3+3, 2+1
4. 6+4, 3+1
5. 10+5, 4+1
6. 15+6, 5+1
7. 21+7, 6+1
8. 28+8, 7+1
9. 36+9, 8+1
结果即是(45,9)。这里演示的是单线程计算过程,实际spark执行中是分布式计算,可能会把list分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。
2. aggregatebykey函数:
对pairrdd中相同的key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregatebykey返回值的类型不需要和rdd中value的类型一致。因为aggregatebykey是对相同key中的值进行聚合操作,所以aggregatebykey'函数最终返回的类型还是pairrdd,对应的结果是key和聚合后的值,而aggregate函数直接返回的是非rdd的结果。
例子程序:
import org.apache.spark.sparkconf import org.apache.spark.sparkcontext object aggregatebykeyop { def main(args:array[string]){ val sparkconf: sparkconf = new sparkconf().setappname("aggregatebykey").setmaster("local") val sc: sparkcontext = new sparkcontext(sparkconf) val data=list((1,3),(1,2),(1,4),(2,3)) val rdd=sc.parallelize(data, 2) //合并不同partition中的值,a,b得数据类型为zerovalue的数据类型 def combop(a:string,b:string):string={ println("combop: "+a+"\t"+b) a+b } //合并在同一个partition中的值,a的数据类型为zerovalue的数据类型,b的数据类型为原value的数据类型 def seqop(a:string,b:int):string={ println("seqop:"+a+"\t"+b) a+b } rdd.foreach(println) //zerovalue:中立值,定义返回value的类型,并参与运算 //seqop:用来在同一个partition中合并值 //combop:用来在不同partiton中合并值 val aggregatebykeyrdd=rdd.aggregatebykey("100")(seqop, combop) sc.stop() } }
运行结果:
将数据拆分成两个分区
//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)//分区一相同key的数据进行合并
seq: 100 3 //(1,3)开始和中立值进行合并 合并结果为 1003
seq: 1003 2 //(1,2)再次合并 结果为 10032//分区二相同key的数据进行合并
seq: 100 4 //(1,4) 开始和中立值进行合并 1004
seq: 100 3 //(2,3) 开始和中立值进行合并 1003将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)
(2,1003)//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032 1004
(1,100321004)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
下一篇: scala中常用特殊符号详解