欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark RDD,reduceByKey vs groupByKey

程序员文章站 2022-07-02 22:50:10
Spark 中有两个类似的api,分别是 reduceByKey 和 groupByKey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。 先看两者的调用顺序(都是使用默认的Partitioner,即defaultPartitioner) 所用 spa ......

spark 中有两个类似的api,分别是 reducebykey 和 groupbykey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。

先看两者的调用顺序(都是使用默认的partitioner,即defaultpartitioner)

所用 spark 版本:spark 2.1.0

先看reducebykey

step1

  def reducebykey(func: (v, v) => v): rdd[(k, v)] = self.withscope {
    reducebykey(defaultpartitioner(self), func)
  }

setp2

  def reducebykey(partitioner: partitioner, func: (v, v) => v): rdd[(k, v)] = self.withscope {
    combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner)
  }

setp3

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope {
    require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0
    if (keyclass.isarray) {
      if (mapsidecombine) {
        throw new sparkexception("cannot use map-side combining with array keys.")
      }
      if (partitioner.isinstanceof[hashpartitioner]) {
        throw new sparkexception("hashpartitioner cannot partition array keys.")
      }
    }
    val aggregator = new aggregator[k, v, c](
      self.context.clean(createcombiner),
      self.context.clean(mergevalue),
      self.context.clean(mergecombiners))
    if (self.partitioner == some(partitioner)) {
      self.mappartitions(iter => {
        val context = taskcontext.get()
        new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context))
      }, preservespartitioning = true)
    } else {
      new shuffledrdd[k, v, c](self, partitioner)
        .setserializer(serializer)
        .setaggregator(aggregator)
        .setmapsidecombine(mapsidecombine)
    }
  }

姑且不去看方法里面的细节,我们会只要知道最后调用的是 combinebykeywithclasstag 这个方法。这个方法有两个参数我们来重点看一下,

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)

首先是 partitioner 参数 ,这个即是 rdd 的分区设置。除了默认的 defaultpartitioner,spark 还提供了 rangepartitioner 和 hashpartitioner 外,此外用户也可以自定义 partitioner 。通过源码可以发现如果是 hashpartitioner 的话,那么是会抛出一个错误的。

然后是 mapsidecombine 参数 ,这个参数正是 reducebykey 和 groupbykey 最大不同的地方,它决定是是否会先在节点上进行一次 combine 操作,下面会有更具体的例子来介绍。

然后是groupbykey

step1

  def groupbykey(): rdd[(k, iterable[v])] = self.withscope {
    groupbykey(defaultpartitioner(self))
  }

step2

  def groupbykey(partitioner: partitioner): rdd[(k, iterable[v])] = self.withscope {
    // groupbykey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createcombiner = (v: v) => compactbuffer(v)
    val mergevalue = (buf: compactbuffer[v], v: v) => buf += v
    val mergecombiners = (c1: compactbuffer[v], c2: compactbuffer[v]) => c1 ++= c2
    val bufs = combinebykeywithclasstag[compactbuffer[v]](
      createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false)
    bufs.asinstanceof[rdd[(k, iterable[v])]]
  }

setp3

def combinebykeywithclasstag[c](
      createcombiner: v => c,
      mergevalue: (c, v) => c,
      mergecombiners: (c, c) => c,
      partitioner: partitioner,
      mapsidecombine: boolean = true,
      serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope {
    require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0
    if (keyclass.isarray) {
      if (mapsidecombine) {
        throw new sparkexception("cannot use map-side combining with array keys.")
      }
      if (partitioner.isinstanceof[hashpartitioner]) {
        throw new sparkexception("hashpartitioner cannot partition array keys.")
      }
    }
    val aggregator = new aggregator[k, v, c](
      self.context.clean(createcombiner),
      self.context.clean(mergevalue),
      self.context.clean(mergecombiners))
    if (self.partitioner == some(partitioner)) {
      self.mappartitions(iter => {
        val context = taskcontext.get()
        new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context))
      }, preservespartitioning = true)
    } else {
      new shuffledrdd[k, v, c](self, partitioner)
        .setserializer(serializer)
        .setaggregator(aggregator)
        .setmapsidecombine(mapsidecombine)
    }
  }

结合上面 reducebykey 的调用链,可以发现最终其实都是调用 combinebykeywithclasstag 这个方法的,但调用的参数不同。
reducebykey的调用

combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner)

groupbykey的调用

combinebykeywithclasstag[compactbuffer[v]](
      createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false)

正是两者不同的调用方式导致了两个方法的差别,我们分别来看

  • reducebykey的泛型参数直接是[v],而groupbykey的泛型参数是[compactbuffer[v]]。这直接导致了 reducebykey 和 groupbykey 的返回值不同,前者是rdd[(k, v)],而后者是rdd[(k, iterable[v])]

  • 然后就是mapsidecombine = false 了,这个mapsidecombine 参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在map端进行初步合并(combine)。可以看看下面具体的例子。

spark RDD,reduceByKey vs groupByKey

spark RDD,reduceByKey vs groupByKey

从功能上来说,可以发现 reducebykey 其实就是会在每个节点先进行一次合并的操作,而 groupbykey 没有。

这么来看 reducebykey 的性能会比 groupbykey 好很多,因为有些工作在节点已经处理了。那么 groupbykey 为什么存在,它的应用场景是什么呢?我也不清楚,如果观看这篇文章的读者知道的话不妨在评论里说出来吧。非常感谢!