spark源码解析--Shuffle输出追踪者--MapOutputTracker
shuffle输出追踪者--mapoutputtracker
这个组件作为shuffle的一个辅助组件,在整个shuffle模块中具有很重要的作用。我们在前面一系列的分析中,或多或少都会提到这个组件,比如在dagscheduler提交一个stage时会将这个stage封装成一个任务集(taskset),但是可能有的分区已经计算过了,有了结果(stage由于失败可能会多次提交,其中有部分task可能已经计算完成),这些分区就不需要再次计算,而只需要计算那些失败的分区,那么很显然需要有一个组件来维护shuffle过程中的任务失败成功的状态,以及计算结果的位置信息。
此外,在shuffle读取阶段,我们知道一个reduce端的分区会依赖于多个map端的分区的输出数据,那么我们在读取一个reduce分区对应的数据时,就需要知道这个reduce分区依赖哪些map分区,每个block的物理位置是什么,blockid是什么,这个block中属于这个reduce分区的数据量大小是多少,这些信息的记录维护都是靠mapoutputtracker来实现的,所以我们现在知道mapoutputtracker的重要性了。
mapoutputtracker.scala
mapoutputtracker组件的主要功能类和辅助类全部在这个文件中,我先大概说一下各个类的主要作用,然后重点分析关键的类。
- shufflestatus,这个类是对一个stage的shuffle输出状态的封装,它内部的一个主要的成员mapstatuses是一个数组,这个数组的下标就是map的分区序号,存放了每个map分区的输出情况,关于mapstatus具体可以看mapstatus.scala,这里不打算展开。
- mapoutputtrackermessage,用于rpc请求的消息类,有两个实现类:getmapoutputstatuses用于获取某次shuffle的所有输出状态;stopmapoutputtracker用于向driver端的发送停止mapoutputtrackermasterendpoint端点的请求。
- mapoutputtrackermasterendpoint,如果熟悉spark的rpc模块的话,对这个类应该就很熟悉,它就是一个rpc服务端,通过向rpcenv注册自己,通过一个名称标识自己,从而接收到特定一些消息,也就是上面说的两种消息。
- mapoutputtracker,这个类是一个抽象类,只是定义了一些操作接口,它的一个最重要的作用可能就是内部维护了一个序列值epoch,这个值表示某一个一致的全局map输出状态,一旦有map输出发生变更,这个值就要加一,executor端会同步最新的epoch以判断自己的map输出状态的缓存是否过期。
- mapoutputtrackermaster,运行在driver端,实现类mapoutputtracker的大部分功能,是最核心的类
- mapoutputtrackerworker,运行在executor端,主要作用是封装了rpc调用的逻辑。
总的来看,最核心的类是mapoutputtrackermaster,其他的类都是围绕这个类的一些辅助类,所以我们重点分析mapoutputtrackermaster,其他的类我不打算深入展开,相信读者自己也能够较为轻松地理解。
mapoutputtrackermaster
findmissingpartitions
这个方法在上面已经提到了,会在dagscheduler封装任务集的时候查找一个stage需要计算的分区时会调用到。
def findmissingpartitions(shuffleid: int): option[seq[int]] = { shufflestatuses.get(shuffleid).map(_.findmissingpartitions()) }
-
shufflestatus.findmissingpartitions
def findmissingpartitions(): seq[int] = synchronized { val missing = (0 until numpartitions).filter(id => mapstatuses(id) == null) assert(missing.size == numpartitions - _numavailableoutputs, s"${missing.size} missing, expected ${numpartitions - _numavailableoutputs}") missing }
这两段代码很简单,不用多说,就是从map结构中查找。
此外,像registershuffle,registermapoutput,unregistermapoutput,unregistershuffle,removeoutputsonhost等等,我们可以看到这几个方法本身都是很简答的,无非就是对内部map结构的插入,更新和查找,关键的是你要清楚这些方法的调用时机是什么?弄清这一点,会让我们对mapoutputtracker在整个spark框架中的作用和充当的角色有更深的理解。方法的调用地点,通过idea这类ide工具其实都可以很简单地定位到,这里我不做过多展开,仅仅简单地概括一下:
- registershuffle, dagscheduler在创建一个shufflemapstage时会顺便把这个stage对应的shuffle注册进来。
- registermapoutput, 在一个shufflemaptask任务完成后,会把map输出的信息注册进来。
- removeoutputsonhost,将某个host上的相关map输出信息全部移除,一般在主机丢失时调用此操作
- removeoutputsonexecutor,同样地,将某个executor上的相关map输出信息全部移除,一般在executor丢失时调用此操作
getmapsizesbyexecutorid
我们来看另一个比较重要的方法,在reduce阶段读取数据时,一个task首先需要知道它依赖于哪些map输出,这时它回想driver端的mapoutputtrackermasterendpoint组件发送一个获取map输出的消息,经过一系列方法调用最终会调用这个方法:
def getmapsizesbyexecutorid(shuffleid: int, startpartition: int, endpartition: int) : seq[(blockmanagerid, seq[(blockid, long)])] = { logdebug(s"fetching outputs for shuffle $shuffleid, partitions $startpartition-$endpartition") shufflestatuses.get(shuffleid) match { case some (shufflestatus) => // 将所有的mapstatus数组转换成(blockmanagerid, seq[(blockid, long)])对象 shufflestatus.withmapstatuses { statuses => mapoutputtracker.convertmapstatuses(shuffleid, startpartition, endpartition, statuses) } case none => seq.empty } }
我们看一下:mapoutputtracker.convertmapstatuses,这个方法也很简单,其实就是将每个map分区输出切分成reduce分区数量,最后产生的(blockid, long)元组数量等于map分区数量*reduce分区数量。
def convertmapstatuses( shuffleid: int, startpartition: int, endpartition: int, statuses: array[mapstatus]): seq[(blockmanagerid, seq[(blockid, long)])] = { assert (statuses != null) // 用于存放结果 val splitsbyaddress = new hashmap[blockmanagerid, arraybuffer[(blockid, long)]] // 最后产生的(blockid, long)元组数量等于map分区数量*reduce分区数量 for ((status, mapid) <- statuses.zipwithindex) { if (status == null) { val errormessage = s"missing an output location for shuffle $shuffleid" logerror(errormessage) throw new metadatafetchfailedexception(shuffleid, startpartition, errormessage) } else { for (part <- startpartition until endpartition) { splitsbyaddress.getorelseupdate(status.location, arraybuffer()) += ((shuffleblockid(shuffleid, mapid, part), status.getsizeforblock(part))) } } } splitsbyaddress.toseq }
getpreferredlocationsforshuffle
我们来看另外一个比较重要的方法。我们知道reduce端的分区一般会依赖于多个map端分区输出,但是对于每个map分区依赖的数据量是不同的,举个极端的例子,假设reduce端某个分区依赖于10个map端的输出分区,但是其中一个分区依赖的数据有10000条,而其他分区依赖的数据只有1条,这种情况下,显然我们应该吧这个reduce任务优先调度到那个依赖了10000条的executor上。当然这个例子举得很简单,可能也不是什么准确,但是也足够说明这个方法的作用。
def getpreferredlocationsforshuffle(dep: shuffledependency[_, _, _], partitionid: int) : seq[string] = { // 首先判断几个参数配置,如果都符合条件,那么再进行偏向位置的计算 if (shufflelocalityenabled && dep.rdd.partitions.length < shuffle_pref_map_threshold && dep.partitioner.numpartitions < shuffle_pref_reduce_threshold) { // 关键调用 val blockmanagerids = getlocationswithlargestoutputs(dep.shuffleid, partitionid, dep.partitioner.numpartitions, reducer_pref_locs_fraction) if (blockmanagerids.nonempty) { blockmanagerids.get.map(_.host) } else { nil } } else { nil } }
可以看出来,关键的方法是getlocationswithlargestoutputs,接下来,我们就来看一下这个方法:
注释已经说得很清楚,这个方法的逻辑很简单,比如一个reduce端分区要读取的总数据量是100m, 某个executor上的所有map输出中与这个reduce分区相关的数据加起来有20m,即超过了总量的0.2,这时这个executor就能够成为偏向位置,是不是很简单。但是这里应该注意到一个问题,这个方法是以executor为最小单位计算偏向位置,而在前一个方法getpreferredlocationsforshuffle中,获取到成为偏向位置的那些blockmanagerid后,仅仅是取出了host作为偏向位置返回给上层调用者,问题在于一个host(即物理节点)上可能有多个executor,这就会造成返回的结果中会有重复的host,;另外,既然返回host作为偏向位置,那为什么不直接以host作为最小单位来计算偏向位置呢,比如将一个host上所有与这个reduce分区相关的数据加起来,如果超过0.2的占比就认为这个host能够作为偏向位置,这样好像更合理,也更容易产生偏向位置。举个极端的例子,一个host上运行了5个executor,每个executor与分区相关的数据占比0.1,另外有5个host上每个都只运行了一个executor,他们的数据占比均为0.1,这种情况下是不会产生偏向位置的,但是实际上显然应该将那个拥有5个executor的host作为偏向位置。
def getlocationswithlargestoutputs( shuffleid: int, reducerid: int, numreducers: int, fractionthreshold: double) : option[array[blockmanagerid]] = { val shufflestatus = shufflestatuses.get(shuffleid).ornull // 对shufflestatus非空检查 if (shufflestatus != null) { shufflestatus.withmapstatuses { statuses => // 对mapstatus数组的非空检查 if (statuses.nonempty) { // hashmap to add up sizes of all blocks at the same location // 记录每个executor上的所有map输出的block中属于这个reduce端分区的数据量 val locs = new hashmap[blockmanagerid, long] var totaloutputsize = 0l var mapidx = 0 while (mapidx < statuses.length) { val status = statuses(mapidx) // status may be null here if we are called between registershuffle, which creates an // array with null entries for each output, and registermapoutputs, which populates it // with valid status entries. this is possible if one thread schedules a job which // depends on an rdd which is currently being computed by another thread. if (status != null) { val blocksize = status.getsizeforblock(reducerid) if (blocksize > 0) { locs(status.location) = locs.getorelse(status.location, 0l) + blocksize totaloutputsize += blocksize } } mapidx = mapidx + 1 } // 最后,判断一个executor能否成为偏向位置的条件是: // 这个executor上所有与这个reduce分区相关的数据大小与这个分区数据总量的比值是否大于一个阈值 // 这个阈值默认是0.2 val toplocs = locs.filter { case (loc, size) => size.todouble / totaloutputsize >= fractionthreshold } // return if we have any locations which satisfy the required threshold if (toplocs.nonempty) { return some(toplocs.keys.toarray) } } } } none }
总结
国际惯例,再晚也要总结一下。我们简单总结一下map输出追踪器的作用:
- 维护所有shuffle的map输出状态信息,位置信息等
- 查找某个stage还有哪些未计算的分区
- 获取reduce分区的偏向位置
- 获取reduce分区依赖哪些map输出,他们的位置,每个map输出中相关数据的大小
上一篇: python中的随机数生成
下一篇: python —— 进程