Spark源码系列(二) Dependency&ReduceBykey源码
Spark源码系列(二) Dependency&ReduceBykey源码
在Spark源码系列(一)RDD的文章中,主要提到了RDD的由来、原理以及主要的Transformation&Action算子。其中依赖和reduceBykey是需要重点深入了解的。本篇文章就重点说一下宽窄依赖和reduceBykey源码。
Dependency
首先从整体上看一下Dependency相关类的继承关系:
/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
Dependency是一个抽象类,成员变量是rdd,也就是父RDD。
下面是NarrowDependency类:
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
NarrowDependency是一个抽象类。NarrowDependency覆盖了Dependency的rdd变量。其值等于NarrowDependency的主构造函数传进来的RDD。我们需要传一个RDD作为主构造函数的参数,这个传进来的RDD其实就是父RDD。同时NarrowDependency还定义了getParents(partitionId: Int): Seq[Int]方法。此方法是用来获得子RDD分区的所有父RDD分区。这个参数是子RDD的一个分区ID,返回值是子RDD的这个分区依赖,所以返回值是一个Seq[Int]。
OneToOneDependency重写getParents方法。因为子RDD分区和父RDD分区是一一对应的,所以分区id是相同的,直接返回List(partitionId)即可。
RangeDependency是NarrowDependency的第二个子类:
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
RangeDependency是范围依赖,用于UnionRDD中。为了理解RangeDependency重写的getParents方法的逻辑,我们先进入到UnionRDD源码中看一下,看getDependencies方法。
override def getDependencies: Seq[Dependency[_]] = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
}
通过遍历传入UnionRDD的rdds,对每一个rdd new一个RangeDependency。参数inStart是父rdd的分区id起始值,都是从0开始。参数outStart是子rdd的分区id起始值,通过pos变量不断累加记录。
RangeDependency的getParents方法中,if语句里面的两个判断语句是为了验证传入的分区id是否合理。如果合理,就返回对应的父rdd分区id。
如果我们想要获取子RDD3中分区id=5的分区在父RDD中的分区id。经过getParents方法可知最后返回List(partitionId-outStart+inStart)。
partitionId=5,outStart=3,inStart=0.因此最后返回List(2).
ShuffleDependency
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
//获取shuffleID
val shuffleId: Int = _rdd.context.newShuffleId()
//向注册shuffleManager注册shuffle信息
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
ShuffleDependency默认构造函数参数为:父RDD,分区器,序列化器,key排序算法,聚合器,是否进行map-side combine。我们看到ShuffleDependency是直接继承的Dependency,但是没有getParent方法。因为宽依赖中,子RDD的一个partition可能依赖于父RDD的多个partition。
区分这两种依赖很有用。首先窄依赖允许在一个集群节点上以流水线的方式计算所有父分区。例如,逐个元素地执行map、filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
一些Transformation算子中比较典型的就是reduceBykey算子,这其中涉及到了shuffle过程。
源码中涉及到三种形式的reduceBykey,根据用户传入的函数来对(K,V)中每个K对应的所有values进行merge操作(具体的func是根据用户自定义的函数),在将结果发送给reducer节点前该merge操作首先会在本地Mapper端进行,其实就是我们常提到的combine操作,是为了降低网络IO。
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
* 传入分区器,根据分区器重新分区
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
* 重新设置分区数
*/
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
* 使用默认分区器
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
从源码中可以看到reduceBykey方法主要执行逻辑在combineByKeyWithClassTag V:
/**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
*
* Users provide three functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*
* @note V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]).
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
// 判断传入分区器是否相同
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
// 不相同则重新返回shuffleRDD
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
- createCombiner:V => C 将V转换为C
- mergeValue:(C,V)=> C 将V合并至C中
- mergeCombiner:(C,C)=> C 合并两个C为一个
源码的最后new了一个ShuffledRDD,进一步跟进看到ShuffledRDD中的getDependencies方法:
override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
最后是返回了List(new ShuffleDependency(prev,part,serializer,keyOrdering,aggregator,mapSideCombine))。这个时候才出现了ShuffleDependency,参数prev就是调用shuffle算子的RDD。我们就更加可以肯定的说reduceBykey中包含shuffle操作。
上一篇: FPGA之fifo设计
下一篇: Using Git in VS Code
推荐阅读
-
spark源码--#两个问题
-
Spark源码系列(二) Dependency&ReduceBykey源码
-
Spark源码分析-1.集群架构介绍和SparkContext源码分析
-
Kafka源码系列教程之删除topic
-
Spark源码 —— 从 SparkSubmit 到 Driver启动
-
spark core源码阅读-提交作业(三)
-
PHP网页游戏学习之Xnova(ogame)源码解读(二)_PHP教程
-
python开源爬虫框架scrapy源码解析(二)
-
求一份dedecms二次开发的源码参照学习
-
(二)androidpn-server tomcat版源码解析之--push消息处理 博客分类: 项目框架 androdipnjava推送