欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark源码解析--Shuffle输出追踪者--MapOutputTracker

程序员文章站 2022-04-08 12:58:06
Shuffle输出追踪者 MapOutputTracker 这个组件作为shuffle的一个辅助组件,在整个shuffle模块中具有很重要的作用。我们在前面一系列的分析中,或多或少都会提到这个组件,比如在DAGScheduler提交一个stage时会将这个stage封装成一个任务集(TaskSet) ......

shuffle输出追踪者--mapoutputtracker

这个组件作为shuffle的一个辅助组件,在整个shuffle模块中具有很重要的作用。我们在前面一系列的分析中,或多或少都会提到这个组件,比如在dagscheduler提交一个stage时会将这个stage封装成一个任务集(taskset),但是可能有的分区已经计算过了,有了结果(stage由于失败可能会多次提交,其中有部分task可能已经计算完成),这些分区就不需要再次计算,而只需要计算那些失败的分区,那么很显然需要有一个组件来维护shuffle过程中的任务失败成功的状态,以及计算结果的位置信息。
此外,在shuffle读取阶段,我们知道一个reduce端的分区会依赖于多个map端的分区的输出数据,那么我们在读取一个reduce分区对应的数据时,就需要知道这个reduce分区依赖哪些map分区,每个block的物理位置是什么,blockid是什么,这个block中属于这个reduce分区的数据量大小是多少,这些信息的记录维护都是靠mapoutputtracker来实现的,所以我们现在知道mapoutputtracker的重要性了。

mapoutputtracker.scala

mapoutputtracker组件的主要功能类和辅助类全部在这个文件中,我先大概说一下各个类的主要作用,然后重点分析关键的类。

  • shufflestatus,这个类是对一个stage的shuffle输出状态的封装,它内部的一个主要的成员mapstatuses是一个数组,这个数组的下标就是map的分区序号,存放了每个map分区的输出情况,关于mapstatus具体可以看mapstatus.scala,这里不打算展开。
  • mapoutputtrackermessage,用于rpc请求的消息类,有两个实现类:getmapoutputstatuses用于获取某次shuffle的所有输出状态;stopmapoutputtracker用于向driver端的发送停止mapoutputtrackermasterendpoint端点的请求。
  • mapoutputtrackermasterendpoint,如果熟悉spark的rpc模块的话,对这个类应该就很熟悉,它就是一个rpc服务端,通过向rpcenv注册自己,通过一个名称标识自己,从而接收到特定一些消息,也就是上面说的两种消息。
  • mapoutputtracker,这个类是一个抽象类,只是定义了一些操作接口,它的一个最重要的作用可能就是内部维护了一个序列值epoch,这个值表示某一个一致的全局map输出状态,一旦有map输出发生变更,这个值就要加一,executor端会同步最新的epoch以判断自己的map输出状态的缓存是否过期。
  • mapoutputtrackermaster,运行在driver端,实现类mapoutputtracker的大部分功能,是最核心的类
  • mapoutputtrackerworker,运行在executor端,主要作用是封装了rpc调用的逻辑。

总的来看,最核心的类是mapoutputtrackermaster,其他的类都是围绕这个类的一些辅助类,所以我们重点分析mapoutputtrackermaster,其他的类我不打算深入展开,相信读者自己也能够较为轻松地理解。

mapoutputtrackermaster

findmissingpartitions

这个方法在上面已经提到了,会在dagscheduler封装任务集的时候查找一个stage需要计算的分区时会调用到。

   def findmissingpartitions(shuffleid: int): option[seq[int]] = {
   shufflestatuses.get(shuffleid).map(_.findmissingpartitions())
   }
  • shufflestatus.findmissingpartitions

      def findmissingpartitions(): seq[int] = synchronized {
      val missing = (0 until numpartitions).filter(id => mapstatuses(id) == null)
      assert(missing.size == numpartitions - _numavailableoutputs,
        s"${missing.size} missing, expected ${numpartitions - _numavailableoutputs}")
      missing
      }

这两段代码很简单,不用多说,就是从map结构中查找。

此外,像registershuffle,registermapoutput,unregistermapoutput,unregistershuffle,removeoutputsonhost等等,我们可以看到这几个方法本身都是很简答的,无非就是对内部map结构的插入,更新和查找,关键的是你要清楚这些方法的调用时机是什么?弄清这一点,会让我们对mapoutputtracker在整个spark框架中的作用和充当的角色有更深的理解。方法的调用地点,通过idea这类ide工具其实都可以很简单地定位到,这里我不做过多展开,仅仅简单地概括一下:

  • registershuffle, dagscheduler在创建一个shufflemapstage时会顺便把这个stage对应的shuffle注册进来。
  • registermapoutput, 在一个shufflemaptask任务完成后,会把map输出的信息注册进来。
  • removeoutputsonhost,将某个host上的相关map输出信息全部移除,一般在主机丢失时调用此操作
  • removeoutputsonexecutor,同样地,将某个executor上的相关map输出信息全部移除,一般在executor丢失时调用此操作

getmapsizesbyexecutorid

我们来看另一个比较重要的方法,在reduce阶段读取数据时,一个task首先需要知道它依赖于哪些map输出,这时它回想driver端的mapoutputtrackermasterendpoint组件发送一个获取map输出的消息,经过一系列方法调用最终会调用这个方法:

def getmapsizesbyexecutorid(shuffleid: int, startpartition: int, endpartition: int)
  : seq[(blockmanagerid, seq[(blockid, long)])] = {
logdebug(s"fetching outputs for shuffle $shuffleid, partitions $startpartition-$endpartition")
shufflestatuses.get(shuffleid) match {
  case some (shufflestatus) =>
    // 将所有的mapstatus数组转换成(blockmanagerid, seq[(blockid, long)])对象
    shufflestatus.withmapstatuses { statuses =>
      mapoutputtracker.convertmapstatuses(shuffleid, startpartition, endpartition, statuses)
    }
  case none =>
    seq.empty
}
}

我们看一下:mapoutputtracker.convertmapstatuses,这个方法也很简单,其实就是将每个map分区输出切分成reduce分区数量,最后产生的(blockid, long)元组数量等于map分区数量*reduce分区数量。

def convertmapstatuses(
  shuffleid: int,
  startpartition: int,
  endpartition: int,
  statuses: array[mapstatus]): seq[(blockmanagerid, seq[(blockid, long)])] = {
assert (statuses != null)
// 用于存放结果
val splitsbyaddress = new hashmap[blockmanagerid, arraybuffer[(blockid, long)]]
// 最后产生的(blockid, long)元组数量等于map分区数量*reduce分区数量
for ((status, mapid) <- statuses.zipwithindex) {
  if (status == null) {
    val errormessage = s"missing an output location for shuffle $shuffleid"
    logerror(errormessage)
    throw new metadatafetchfailedexception(shuffleid, startpartition, errormessage)
  } else {
    for (part <- startpartition until endpartition) {
      splitsbyaddress.getorelseupdate(status.location, arraybuffer()) +=
        ((shuffleblockid(shuffleid, mapid, part), status.getsizeforblock(part)))
    }
  }
}

splitsbyaddress.toseq
}

getpreferredlocationsforshuffle

我们来看另外一个比较重要的方法。我们知道reduce端的分区一般会依赖于多个map端分区输出,但是对于每个map分区依赖的数据量是不同的,举个极端的例子,假设reduce端某个分区依赖于10个map端的输出分区,但是其中一个分区依赖的数据有10000条,而其他分区依赖的数据只有1条,这种情况下,显然我们应该吧这个reduce任务优先调度到那个依赖了10000条的executor上。当然这个例子举得很简单,可能也不是什么准确,但是也足够说明这个方法的作用。

def getpreferredlocationsforshuffle(dep: shuffledependency[_, _, _], partitionid: int)
  : seq[string] = {
// 首先判断几个参数配置,如果都符合条件,那么再进行偏向位置的计算
if (shufflelocalityenabled && dep.rdd.partitions.length < shuffle_pref_map_threshold &&
    dep.partitioner.numpartitions < shuffle_pref_reduce_threshold) {
  // 关键调用
  val blockmanagerids = getlocationswithlargestoutputs(dep.shuffleid, partitionid,
    dep.partitioner.numpartitions, reducer_pref_locs_fraction)
  if (blockmanagerids.nonempty) {
    blockmanagerids.get.map(_.host)
  } else {
    nil
  }
} else {
  nil
}
}

可以看出来,关键的方法是getlocationswithlargestoutputs,接下来,我们就来看一下这个方法:
注释已经说得很清楚,这个方法的逻辑很简单,比如一个reduce端分区要读取的总数据量是100m, 某个executor上的所有map输出中与这个reduce分区相关的数据加起来有20m,即超过了总量的0.2,这时这个executor就能够成为偏向位置,是不是很简单。但是这里应该注意到一个问题,这个方法是以executor为最小单位计算偏向位置,而在前一个方法getpreferredlocationsforshuffle中,获取到成为偏向位置的那些blockmanagerid后,仅仅是取出了host作为偏向位置返回给上层调用者,问题在于一个host(即物理节点)上可能有多个executor,这就会造成返回的结果中会有重复的host,;另外,既然返回host作为偏向位置,那为什么不直接以host作为最小单位来计算偏向位置呢,比如将一个host上所有与这个reduce分区相关的数据加起来,如果超过0.2的占比就认为这个host能够作为偏向位置,这样好像更合理,也更容易产生偏向位置。举个极端的例子,一个host上运行了5个executor,每个executor与分区相关的数据占比0.1,另外有5个host上每个都只运行了一个executor,他们的数据占比均为0.1,这种情况下是不会产生偏向位置的,但是实际上显然应该将那个拥有5个executor的host作为偏向位置。

def getlocationswithlargestoutputs(
  shuffleid: int,
  reducerid: int,
  numreducers: int,
  fractionthreshold: double)
: option[array[blockmanagerid]] = {

val shufflestatus = shufflestatuses.get(shuffleid).ornull
// 对shufflestatus非空检查
if (shufflestatus != null) {
  shufflestatus.withmapstatuses { statuses =>
    // 对mapstatus数组的非空检查
    if (statuses.nonempty) {
      // hashmap to add up sizes of all blocks at the same location
      // 记录每个executor上的所有map输出的block中属于这个reduce端分区的数据量
      val locs = new hashmap[blockmanagerid, long]
      var totaloutputsize = 0l
      var mapidx = 0
      while (mapidx < statuses.length) {
        val status = statuses(mapidx)
        // status may be null here if we are called between registershuffle, which creates an
        // array with null entries for each output, and registermapoutputs, which populates it
        // with valid status entries. this is possible if one thread schedules a job which
        // depends on an rdd which is currently being computed by another thread.
        if (status != null) {
          val blocksize = status.getsizeforblock(reducerid)
          if (blocksize > 0) {
            locs(status.location) = locs.getorelse(status.location, 0l) + blocksize
            totaloutputsize += blocksize
          }
        }
        mapidx = mapidx + 1
      }
      // 最后,判断一个executor能否成为偏向位置的条件是:
      // 这个executor上所有与这个reduce分区相关的数据大小与这个分区数据总量的比值是否大于一个阈值
      // 这个阈值默认是0.2
      val toplocs = locs.filter { case (loc, size) =>
        size.todouble / totaloutputsize >= fractionthreshold
      }
      // return if we have any locations which satisfy the required threshold
      if (toplocs.nonempty) {
        return some(toplocs.keys.toarray)
      }
    }
  }
}
none
}

总结

国际惯例,再晚也要总结一下。我们简单总结一下map输出追踪器的作用:

  • 维护所有shuffle的map输出状态信息,位置信息等
  • 查找某个stage还有哪些未计算的分区
  • 获取reduce分区的偏向位置
  • 获取reduce分区依赖哪些map输出,他们的位置,每个map输出中相关数据的大小