spark RDD,reduceByKey vs groupByKey
spark 中有两个类似的api,分别是 reducebykey 和 groupbykey 。这两个的功能类似,但底层实现却有些不同,那么为什么要这样设计呢?我们来从源码的角度分析一下。
先看两者的调用顺序(都是使用默认的partitioner,即defaultpartitioner)
所用 spark 版本:spark 2.1.0
先看reducebykey
step1
def reducebykey(func: (v, v) => v): rdd[(k, v)] = self.withscope { reducebykey(defaultpartitioner(self), func) }
setp2
def reducebykey(partitioner: partitioner, func: (v, v) => v): rdd[(k, v)] = self.withscope { combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner) }
setp3
def combinebykeywithclasstag[c]( createcombiner: v => c, mergevalue: (c, v) => c, mergecombiners: (c, c) => c, partitioner: partitioner, mapsidecombine: boolean = true, serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope { require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0 if (keyclass.isarray) { if (mapsidecombine) { throw new sparkexception("cannot use map-side combining with array keys.") } if (partitioner.isinstanceof[hashpartitioner]) { throw new sparkexception("hashpartitioner cannot partition array keys.") } } val aggregator = new aggregator[k, v, c]( self.context.clean(createcombiner), self.context.clean(mergevalue), self.context.clean(mergecombiners)) if (self.partitioner == some(partitioner)) { self.mappartitions(iter => { val context = taskcontext.get() new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context)) }, preservespartitioning = true) } else { new shuffledrdd[k, v, c](self, partitioner) .setserializer(serializer) .setaggregator(aggregator) .setmapsidecombine(mapsidecombine) } }
姑且不去看方法里面的细节,我们会只要知道最后调用的是 combinebykeywithclasstag 这个方法。这个方法有两个参数我们来重点看一下,
def combinebykeywithclasstag[c]( createcombiner: v => c, mergevalue: (c, v) => c, mergecombiners: (c, c) => c, partitioner: partitioner, mapsidecombine: boolean = true, serializer: serializer = null)
首先是 partitioner 参数 ,这个即是 rdd 的分区设置。除了默认的 defaultpartitioner,spark 还提供了 rangepartitioner 和 hashpartitioner 外,此外用户也可以自定义 partitioner 。通过源码可以发现如果是 hashpartitioner 的话,那么是会抛出一个错误的。
然后是 mapsidecombine 参数 ,这个参数正是 reducebykey 和 groupbykey 最大不同的地方,它决定是是否会先在节点上进行一次 combine 操作,下面会有更具体的例子来介绍。
然后是groupbykey
step1
def groupbykey(): rdd[(k, iterable[v])] = self.withscope { groupbykey(defaultpartitioner(self)) }
step2
def groupbykey(partitioner: partitioner): rdd[(k, iterable[v])] = self.withscope { // groupbykey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createcombiner = (v: v) => compactbuffer(v) val mergevalue = (buf: compactbuffer[v], v: v) => buf += v val mergecombiners = (c1: compactbuffer[v], c2: compactbuffer[v]) => c1 ++= c2 val bufs = combinebykeywithclasstag[compactbuffer[v]]( createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false) bufs.asinstanceof[rdd[(k, iterable[v])]] }
setp3
def combinebykeywithclasstag[c]( createcombiner: v => c, mergevalue: (c, v) => c, mergecombiners: (c, c) => c, partitioner: partitioner, mapsidecombine: boolean = true, serializer: serializer = null)(implicit ct: classtag[c]): rdd[(k, c)] = self.withscope { require(mergecombiners != null, "mergecombiners must be defined") // required as of spark 0.9.0 if (keyclass.isarray) { if (mapsidecombine) { throw new sparkexception("cannot use map-side combining with array keys.") } if (partitioner.isinstanceof[hashpartitioner]) { throw new sparkexception("hashpartitioner cannot partition array keys.") } } val aggregator = new aggregator[k, v, c]( self.context.clean(createcombiner), self.context.clean(mergevalue), self.context.clean(mergecombiners)) if (self.partitioner == some(partitioner)) { self.mappartitions(iter => { val context = taskcontext.get() new interruptibleiterator(context, aggregator.combinevaluesbykey(iter, context)) }, preservespartitioning = true) } else { new shuffledrdd[k, v, c](self, partitioner) .setserializer(serializer) .setaggregator(aggregator) .setmapsidecombine(mapsidecombine) } }
结合上面 reducebykey 的调用链,可以发现最终其实都是调用 combinebykeywithclasstag 这个方法的,但调用的参数不同。
reducebykey的调用
combinebykeywithclasstag[v]((v: v) => v, func, func, partitioner)
groupbykey的调用
combinebykeywithclasstag[compactbuffer[v]]( createcombiner, mergevalue, mergecombiners, partitioner, mapsidecombine = false)
正是两者不同的调用方式导致了两个方法的差别,我们分别来看
reducebykey的泛型参数直接是[v],而groupbykey的泛型参数是[compactbuffer[v]]。这直接导致了 reducebykey 和 groupbykey 的返回值不同,前者是rdd[(k, v)],而后者是rdd[(k, iterable[v])]
然后就是mapsidecombine = false 了,这个mapsidecombine 参数的默认是true的。这个值有什么用呢,上面也说了,这个参数的作用是控制要不要在map端进行初步合并(combine)。可以看看下面具体的例子。
从功能上来说,可以发现 reducebykey 其实就是会在每个节点先进行一次合并的操作,而 groupbykey 没有。
这么来看 reducebykey 的性能会比 groupbykey 好很多,因为有些工作在节点已经处理了。那么 groupbykey 为什么存在,它的应用场景是什么呢?我也不清楚,如果观看这篇文章的读者知道的话不妨在评论里说出来吧。非常感谢!
上一篇: Storm学习笔记 - 消息容错机制
下一篇: alita中组件的自定义与组件的再封装