欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Spark算子之aggregateByKey、aggregate详解

程序员文章站 2022-04-21 20:50:15
aggregateByKey aggregateByKey的用法同combineByKey,针对combineByKey的三个参数: createCombiner: V => C,m...

aggregateByKey

aggregateByKey的用法同combineByKey,针对combineByKey的三个参数:

createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C

将createCombiner: V => C替换成一个初始值 C ,相当于aggregateByKey的三个参数为:

zeroValue: C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C

源码

/**
  * 底层同样调用的是 combineByKeyWithClassTag
  */
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)] = self.withScope {

    ...

  combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
    cleanedSeqOp, combOp, partitioner)
}
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

Scala实战案例

/***/
def avgScore(): Unit = {
//省略代码 ...

  val avgscoreRdd = studentDetailRdd.aggregateByKey(
    //1、zeroValue 
    // * 区别点:(0.0f,0) 代替 :x => (x.score,1)
    (0.0f,0) ,
    //2、 mergeValue:合并值函数
    (acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1),
    //3、 mergeCombiners:合并组合器函数
    (acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  )
}

Java实战案例

public static void avgScore() {
/**省略代码*/

    //2、mergeValue:合并值函数

    Function2,ScoreDetail003,Tuple2> mergeValue = new Function2, ScoreDetail003, Tuple2>() {
        @Override
        public Tuple2 call(Tuple2 v1, ScoreDetail003 v2) throws Exception {
            return new Tuple2(v1._1()+v2.score,v1._2()+1);
        }
    };

    //3、mergeCombiners:合并组合器函数

    Function2,Tuple2,Tuple2> mergeCombiners = new Function2, Tuple2, Tuple2>() {
        @Override
        public Tuple2 call(Tuple2 v1, Tuple2 v2) throws Exception {
            return new Tuple2(v1._1()+v2._1(),v1._2()+v2._2());
        }
    };

    //4、aggregateByKey并求均值
    // * 区别点:new Tuple2(0.0f,0) 代替 createCombiner

    JavaPairRDD res = pairRDD
    .aggregateByKey(new Tuple2(0.0f,0), mergeValue, mergeCombiners, 2)
    .mapToPair(x -> new Tuple2(x._1(),x._2()._1()/x._2()._2()));

}

aggregate

/**
 * 聚合每个分区的元素,然后是聚合所有分区的结果,使用给定的组合函数和一个中性的"zero value". 
 *
 * @param zeroValue "seqOp"操作符的每个分区的合并结果的初始值,同时也是"combOp"操作对于不同分区合并结果的初始值;
 *                  这通常是中性元素(比如 `Nil` 对于集合操作 或者 `0` for 求和操作)
 * @param seqOp 用于在分区中累积结果的操作符
 * @param combOp 用于组合不同分区的结果的关联运算符
 */

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {}

Scala实战案例

def aggregateOp(): Unit ={
  val conf = new SparkConf().setAppName("aggregateOp").setMaster("local")
  val sc = new SparkContext(conf)
  val list = List("aa","bb","cc")
  val rdd = sc.parallelize(list)

  def seqOp(a:String,b:String): String ={
    a+"-"+b
  }
  def combOp(a:String,b:String): String ={
    a+":"+b
  }
  val res = rdd.aggregate("oo")(seqOp,combOp)

  println(res) 
}

结果 oo:oo-aa-bb-cc

从结果可以看出,zeroValue分别在seqOp和combOp操作中,都作为了一次初始值!!


Java实战案例

public static void aggregateOp(){
    SparkConf conf = new SparkConf().setMaster("local").setAppName("aggregateOp");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD rdd1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9),2);

    // 1、seqOp
    Function2 seqOp = new Function2() {
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return Math.max(v1,v2);
        }
    };
    //2、combOp
    Function2 combOp = new Function2() {
        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return Math.min(v1,v2);
        }
    };

    //3、aggregate
    Integer res = rdd1.aggregate(0,seqOp,combOp);

    System.out.println(res); 
}

结果:0