您现在的位置是: 首页

Spark源码系列(二) Dependency&ReduceBykey源码

程序员文章站 2024-02-23 09:08:34

Spark源码系列(二) Dependency&ReduceBykey源码




Spark源码系列(二) Dependency&ReduceBykey源码

 * :: DeveloperApi ::
 * Base class for dependencies.
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]



 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd

NarrowDependency是一个抽象类。NarrowDependency覆盖了Dependency的rdd变量。其值等于NarrowDependency的主构造函数传进来的RDD。我们需要传一个RDD作为主构造函数的参数,这个传进来的RDD其实就是父RDD。同时NarrowDependency还定义了getParents(partitionId: Int): Seq[Int]方法。此方法是用来获得子RDD分区的所有父RDD分区。这个参数是子RDD的一个分区ID,返回值是子RDD的这个分区依赖,所以返回值是一个Seq[Int]。



 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {


override def getDependencies: Seq[Dependency[_]] = {
  val deps = new ArrayBuffer[Dependency[_]]
  var pos = 0
  for (rdd <- rdds) {
    deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
    pos += rdd.partitions.length

通过遍历传入UnionRDD的rdds,对每一个rdd new一个RangeDependency。参数inStart是父rdd的分区id起始值,都是从0开始。参数outStart是子rdd的分区id起始值,通过pos变量不断累加记录。


Spark源码系列(二) Dependency&ReduceBykey源码




 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)


ShuffleDependency默认构造函数参数为:父RDD,分区器,序列化器,key排序算法,聚合器,是否进行map-side combine。我们看到ShuffleDependency是直接继承的Dependency,但是没有getParent方法。因为宽依赖中,子RDD的一个partition可能依赖于父RDD的多个partition。




 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce.
 * 传入分区器,根据分区器重新分区
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
 * 重新设置分区数
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
  reduceByKey(new HashPartitioner(numPartitions), func)

 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
 * parallelism level.
 * 使用默认分区器
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  reduceByKey(defaultPartitioner(self), func)

从源码中可以看到reduceBykey方法主要执行逻辑在combineByKeyWithClassTag V:

 * :: Experimental ::
 * Generic function to combine the elements for each key using a custom set of aggregation
 * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
 * Users provide three functions:
 *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
 *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
 *  - `mergeCombiners`, to combine two C's into a single one.
 * In addition, users can control the partitioning of the output RDD, and whether to perform
 * map-side aggregation (if a mapper can produce multiple items with the same key).
 * @note V and C can be different -- for example, one might group an RDD of type
 * (Int, Int) into an RDD of type (Int, Seq[Int]).
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
  val aggregator = new Aggregator[K, V, C](
  // 判断传入分区器是否相同
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
   // 不相同则重新返回shuffleRDD
    new ShuffledRDD[K, V, C](self, partitioner)
  • createCombiner:V => C 将V转换为C
  • mergeValue:(C,V)=> C 将V合并至C中
  • mergeCombiner:(C,C)=> C 合并两个C为一个


override def getDependencies: Seq[Dependency[_]] = {
  val serializer = userSpecifiedSerializer.getOrElse {
    val serializerManager = SparkEnv.get.serializerManager
    if (mapSideCombine) {
      serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
    } else {
      serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
  List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))

最后是返回了List(new ShuffleDependency(prev,part,serializer,keyOrdering,aggregator,mapSideCombine))。这个时候才出现了ShuffleDependency,参数prev就是调用shuffle算子的RDD。我们就更加可以肯定的说reduceBykey中包含shuffle操作。

相关标签: Spark源码