欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark源码系列(9)BlockManager的原理

程序员文章站 2024-02-25 11:19:04
...

上一篇说到CacheManager和checkpoint来管理缓存和数据相关的东西。但实际上,他们底层都是通过BlockManger来管理数据的。

找到RDD#getOrCompute中的

SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag,我们就可以看到最终是通过BlockManager来管理数据。在分析源码前老规矩,先画一张整体的图

spark源码系列(9)BlockManager的原理

首先Driver上有一个BlockManagerMaster,它的功能就是负责各个节点上的BlockManager元数据管理,进行维护。比如block的增删改查等操作,都会在这里维护变更。

每个节点上都会有blockManager,BlockManager有几个关键组件:

DiskStore:负责磁盘上的数据进行读写

MemoryStore:负责对内存上的数据进行读写

BlockTransferService:负责和远程节点建立数据传输的组件

每个BlockManager创建之后,向Driver中的BlockManagerMaster进行注册,此时BlockManager会为创建对应的BlockManagerInfo。

使用BlockManager进行读写操作时,比如RDD运行过程中间数据,或者手动指定了persisit,优先将数据写入内存,如果内存不够,会使用算法将内存中的数据写入磁盘。如果persisit指定了replica,那么会使用BlockTransferService将数据replicate一份到其他的节点的blockManager上去。

只要使用了blockManager执行了数据的增删改操作,。那么必须将Block的BlockStatus上报到BlockManagerMaster上去,在BlockManagerMaster上,会对指定的BlockManager的BlockManagerInfo内部的blockStatus增删改查操作。从而达到维护元数据的功能。。

从blockManager上读数据,优先从本地读取,DisStore或者MemoryStore。如果本地没有数据的话,那么会用BlockTransferService从远程节点上读取数据。

-----------------------------------

先看下BlockManager的注册过程:

在Driver和Executor端都会初始化BlockManager。

在创建SparkContext对象的时候就会调用_env.blockManager.initialize(_applicationId)创建BlockManager对象

创建Executor的时候,Executor内部会调用_env.blockManager.initialize(conf.getAppId)方法创建BlockManager

点击进入Blockmanager#initialize方法

/**
 * BlockManager 运行在每个节点上,包括driver和executors都会有一份主要提供了本地或者远程存储的功能
 * 支持内存 磁盘 堆外存储(Tychyon)
 */
private[spark] class BlockManager(
    executorId: String,  //BlockManager运行在哪个Executor之上
    rpcEnv: RpcEnv,   //远程通信体
    val master: BlockManagerMaster,  //BlockManagerMaster,管理整个集群的BlockManger
    val serializerManager: SerializerManager,//默认序列化器
    val conf: SparkConf,
    memoryManager: MemoryManager,//内存管理器
    mapOutputTracker: MapOutputTracker,//shuffle输出
    shuffleManager: ShuffleManager,//shuffle管理器
    val blockTransferService: BlockTransferService,//用于Block间的网络通信(进行备份时)
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

......

  //负责内存存储  
  private[spark] val memoryStore =
    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
  //负责磁盘存储
  private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
  memoryManager.setMemoryStore(memoryStore)

......


  def initialize(appId: String): Unit = {
    //初始化BlockTransferService,其实是它的子类NettyBlockTransferService是下了init方法
    //该方法的作用就是初始化传输服务,通过传输服务可以从不同的节点上拉取Block数据
    blockTransferService.init(this)
    shuffleClient.init(appId)

    //设置block的复制分片策略,由spark.storage.replication.policy指定
    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    /**
     * 根据给定参数为对对应的Executor封装一个BlockManagerId对象(块存储的唯一标识)
     * executorId:executor的Id,
     * blockTransferService.hostName:传输Block数据的服务的主机名
     * blockTransferService.port:传输Block数据的服务的主机名
     */
    val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

    //调用BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster注册
    val idFromMaster = master.registerBlockManager(
      id,
      maxOnHeapMemory,
      maxOffHeapMemory,
      slaveEndpoint)

    //更新BlockManagerId
    blockManagerId = if (idFromMaster != null) idFromMaster else id

    //判断是否开了外部shuffle服务
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    // Register Executors' configuration with the local shuffle service, if one should exist.
    //如果开启了外部shuffle服务,并且该节点是Driver的话就调用registerWithExternalShuffleServer方法
    //将BlockManager注册在本地
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

BlockManagerMaster类里的registerBlockManager方法,向Driver发送RegisterBlockManager消息进行注册

  /**
   * 向Driver发送RegisterBlockManager消息进行注册
   */
  def registerBlockManager(
      blockManagerId: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    logInfo(s"Registering BlockManager $blockManagerId")
    //向Driver发送注册BlockManager请求
    //blockManagerId:块存储的唯一标识,里边封装了该BlockManager所在的executorId,提供Netty服务的主机名和端口
    //maxMemSize最大的内存
    val updatedId = driverEndpoint.askSync[BlockManagerId](
      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
    logInfo(s"Registered BlockManager $updatedId")
    updatedId
  }

BlockManagerMasterEndpoint类里的receiveAndReply方法,这个方法就是接受请求的消息,然后并处理请求

 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    //接收到来自Executor上的BlockManager注册请求的时候,调用register方法开始注册BlockManager,
    case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
      context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))

......

}

BlockManagerMasterEndpoint类里的register方法,该方法的作用就是开始注册executor上的BlockManager

/**
 * 负责维护各个executor和BlockManager的元数据 BlockManagerInfo、BlockStatus
 */
private[spark]
class BlockManagerMasterEndpoint(
    override val rpcEnv: RpcEnv,
    val isLocal: Boolean,
    conf: SparkConf,
    listenerBus: LiveListenerBus)
  extends ThreadSafeRpcEndpoint with Logging {
  
  // Mapping from block manager id to the block manager's information.
  // BlockManagerId->BlockManagerInfo的映射
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  //executorId->BlockManagerId的映射
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

......

  /**
   * Returns the BlockManagerId with topology information populated, if available.
   * 
   * 开始注册executor上的BlockManager
   * 
   */
  private def register(
      idWithoutTopologyInfo: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    // the dummy id is not expected to contain the topology information.
    // we get that info here and respond back with a more fleshed out block manager id
    //利用从Executor上传过来的BlockManagerId信息重新封装BlockManagerId,并且
    //之前传过来没有拓扑信息,这次直接将拓扑信息也封装进去,得到一个更完整的BlockManagerId
    val id = BlockManagerId(
      idWithoutTopologyInfo.executorId,
      idWithoutTopologyInfo.host,
      idWithoutTopologyInfo.port,
      topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

    val time = System.currentTimeMillis()
    //判断当前这个BlockManagerId是否注册过,注册结构为:HashMap[BlockManagerId, BlockManagerInfo]
    //如果没注册过就向下执行开始注册
    if (!blockManagerInfo.contains(id)) {
      /**
       * 首先会根据executorId查找内存缓存结构中是否有对应的BlockManagerId,如果为存在那么就将调用removeExecutor方法,将executor从BlockManagerMaster中移除
       * 首先会移除executorId对应的BlockManagerId,然后在移除该旧的BlockManager。
       * 其实就是移除以前的注册过的旧数据
       */
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(oldId) =>
          // A block manager of the same executor already exists, so remove it (assumed dead)
          logError("Got two different block manager registrations on same executor - "
              + s" will replace old one $oldId with new one $id")
          removeExecutor(id.executorId)
        case None =>
      }
      logInfo("Registering block manager %s with %s RAM, %s".format(
        id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))

      //将executorId与BlockManagerId映射起来,放到内存缓存中
      blockManagerIdByExecutor(id.executorId) = id

      //将BlockManagerId与BlockManagerInfo映射起来,放入内存缓存中
     //BlockManagerInfo封住了BlockMangerId,当前注册的事件,最大的内存
      blockManagerInfo(id) = new BlockManagerInfo(
        id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
    id
  }

..... 
 
}

找到BlockManagerInfo.scala 

/**
 * 每个BlockManager的BlockManagerInfo,相当于是BlockManager的元数据
 */
private[spark] class BlockManagerInfo(
    val blockManagerId: BlockManagerId,
    timeMs: Long,
    val maxOnHeapMem: Long,
    val maxOffHeapMem: Long,
    val slaveEndpoint: RpcEndpointRef)
  extends Logging {

  val maxMem = maxOnHeapMem + maxOffHeapMem

  private var _lastSeenMs: Long = timeMs
  private var _remainingMem: Long = maxMem

  // Mapping from block id to its status.
  //BlockManagerInfo 管理了每个BlockManager内部的BlockId->BlockStatus映射
  private val _blocks = new JHashMap[BlockId, BlockStatus]

......

}
----------------------------------------------

上面简述了一下blockManager的***制,接下去说一下数据的读取

先用一张图简要说明下整体流程:

spark源码系列(9)BlockManager的原理

 

找到RDD#getOrCompute方法

 

SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {

 /**
   * 如果指定的block存在,则直接获取,否则调用makeIterator方法去计算block,然后持久化最后返回值
   */
  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    // Attempt to read the block from local or remote storage. If it's present, then we don't need
    // to go through the local-get-or-put path.
     // 尝试从本地获取数据,如果获取不到则从远端获取
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
        // Need to compute the block.
    }
    // Initially we hold no locks on this block.
    // 如果本地化和远端都没有获取到数据,则调用makeIterator计算,最后将结果写入block
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      // 表示写入成功
      case None =>
        // doPut() didn't hand work back to us, so the block already existed or was successfully
        // stored. Therefore, we now hold a read lock on the block.
        // 从本地获取数据块
        val blockResult = getLocalValues(blockId).getOrElse {
          // Since we held a read lock between the doPut() and get() calls, the block should not
          // have been evicted, so get() not returning the block indicates some internal error.
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        // We already hold a read lock on the block from the doPut() call and getLocalValues()
        // acquires the lock again, so we need to call releaseLock() here so that the net number
        // of lock acquisitions is 1 (since the caller will only call release() once).
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) => // 如果写入失败
        // The put failed, likely because the data was too large to fit in memory and could not be
        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
        // that they can decide what to do with the values (e.g. process them without caching).
        // 如果put操作失败,表示可能是因为数据太大,无法写入内存,又无法被磁盘drop,因此我们需要返回这个iterator给调用者
       Right(iter)
    }
  }

BlockManager的get方法

  /**
   * 读取数据的入口点,会判断数据是否在本地而选择是直接从本地读取还是通过BlockTransferService读取远程数据:
   */
  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
   // 从本地获取block
    val local = getLocalValues(blockId)
      // 如果本地获取到了则返回
    if (local.isDefined) {
      logInfo(s"Found block $blockId locally")
      return local
    }
    // 如果本地没有获取到则从远端获取
    val remote = getRemoteValues[T](blockId)
    // 如果远端获取到了则返回,没有返回None
    if (remote.isDefined) {
      logInfo(s"Found block $blockId remotely")
      return remote
    }
    None
  }

BlockManager的getLocalValues方法

  /**
   * 从本地获取block,如果存在返回BlockResult,不存在返回None;
   * 如果storage level是磁盘,则还需将得到的block缓存到内存存储,方便下次读取
   */
  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
    logDebug(s"Getting local block $blockId")
    // 调用block info manager,锁定该block,然后读取block,返回该block 元数据block info
    blockInfoManager.lockForReading(blockId) match {
      case None =>  // 没有读取到则返回None
        logDebug(s"Block $blockId was not found")
        None
      case Some(info) =>  // 读取到block元数据
        // 获取存储级别storage level
        val level = info.level
        logDebug(s"Level for block $blockId is $level")
        val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
        // 如果使用内存,且内存memory store包含这个block id
        if (level.useMemory && memoryStore.contains(blockId)) {
          // 如果序列化了,那么说明是对象数据,使用getValues
          val iter: Iterator[Any] = if (level.deserialized) {
            memoryStore.getValues(blockId).get
          } else {
            // 没序列化,那么是数据流,使用getBytes()
            serializerManager.dataDeserializeStream(
              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
          }
          // We need to capture the current taskId in case the iterator completion is triggered
          // from a different thread which does not have TaskContext set; see SPARK-18406 for
          // discussion.
          val ci = CompletionIterator[Any, Iterator[Any]](iter, {
            releaseLock(blockId, taskAttemptId)
          })
          // 构建一个BlockResult对象返回,这个对象包括数据,读取方式以及字节大小
          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
        } 
        // 如果使用磁盘存储,且disk store包含这个block则从磁盘获取,并且把结果放入内存
        else if (level.useDisk && diskStore.contains(blockId)) {
           // 先读取数据
          val diskData = diskStore.getBytes(blockId)
          val iterToReturn: Iterator[Any] = {
            //如果需要反序列化,则进行反序列
            if (level.deserialized) {
              val diskValues = serializerManager.dataDeserializeStream(
                blockId,
                diskData.toInputStream())(info.classTag)
              // 先序列化,后将数据放入内存
              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
            } else {
              // 先将数据放入内存
              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
                .map { _.toInputStream(dispose = false) }
                .getOrElse { diskData.toInputStream() }
              //序列化返回的值
              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
            }
          }
          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
            releaseLockAndDispose(blockId, diskData, taskAttemptId)
          })
          // 构建BlockResult返回
          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
        } else {
          // 处理本地读取block失败,报告driver这是一个无效的block,将会删除这个block
          handleLocalReadFailure(blockId)
        }
    }
  }

memoryStore.getValues(blockId).get方法

  //存放每个block数据
  private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

......

  def getValues(blockId: BlockId): Option[Iterator[_]] = {
   //多线程并发访问同步
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: SerializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
      case DeserializedMemoryEntry(values, _, _) =>
        val x = Some(values)
        x.map(_.iterator)
    }
  }

  def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: DeserializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getBytes on serialized blocks")
      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
    }
  }


磁盘读取diskStore.getBytes(blockId)方法

  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
        // and provides InputStream / FileRegion implementations for reading the data.
        new EncryptedBlockData(file, blockSize, conf, key)

      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

回到BlockManager的get方法

远程读取val remote = getRemoteValues[T](blockId)

 /**
   * Get block from remote block managers.
   *
   * This does not acquire a lock on this block in this JVM.
   */
  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val ct = implicitly[ClassTag[T]]
   // 将远程fetch的结果进行反序列化,然后构建BlockResult返回
    getRemoteBytes(blockId).map { data =>
      val values =
        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
      new BlockResult(values, DataReadMethod.Network, data.size)
    }
  }

点击进入getRemoteBytes

  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    // TODO if we change this method to return the ManagedBuffer, then getRemoteValues
    // could just use the inputStream on the temp file, rather than reading the file into memory.
    // Until then, replication can cause the process to use too much memory and get killed
    // even though we've read the data to disk.
    logDebug(s"Getting remote block $blockId")
    require(blockId != null, "BlockId is null")
    var runningFailureCount = 0
    var totalFailureCount = 0

    // Because all the remote blocks are registered in driver, it is not necessary to ask
    // all the slave executors to get block status.
    //查询这个数据的具体位置,获取到了所有持有这个block 的block manager id
    val locationsAndStatus = master.getLocationsAndStatus(blockId)
    val blockSize = locationsAndStatus.map { b =>
      b.status.diskSize.max(b.status.memSize)
    }.getOrElse(0L)
    val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)

    // If the block size is above the threshold, we should pass our FileManger to
    // BlockTransferService, which will leverage it to spill the block; if not, then passed-in
    // null value means the block will be persisted in memory.
    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
      remoteBlockTempFileManager
    } else {
      null
    }
    val locations = sortLocations(blockLocations)
    // 最大允许的获取block的失败次数为该block对应的block manager数量
    val maxFetchFailures = locations.size
    var locationIterator = locations.iterator
    // 开始遍历block manager
    while (locationIterator.hasNext) {
      val loc = locationIterator.next()
      logDebug(s"Getting remote block $blockId from $loc")
      // 通过调用BlockTransferSerivce的fetchBlockSync方法从远端获取block
      val data = try {
        blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager)
      } catch {
        case NonFatal(e) =>
          runningFailureCount += 1
          totalFailureCount += 1
          // 如果总的失败数量大于了阀值则返回None
          if (totalFailureCount >= maxFetchFailures) {
            // Give up trying anymore locations. Either we've tried all of the original locations,
            // or we've refreshed the list of locations from the master, and have still
            // hit failures after trying locations from the refreshed list.
            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
              s"Most recent failure cause:", e)
            return None
          }

          logWarning(s"Failed to fetch remote block $blockId " +
            s"from $loc (failed attempt $runningFailureCount)", e)

          // If there is a large number of executors then locations list can contain a
          // large number of stale entries causing a large number of retries that may
          // take a significant amount of time. To get rid of these stale entries
          // we refresh the block locations after a certain number of fetch failures
          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
            locationIterator = sortLocations(master.getLocations(blockId)).iterator
            logDebug(s"Refreshed locations from the driver " +
              s"after ${runningFailureCount} fetch failures.")
            runningFailureCount = 0
          }

          // This location failed, so we retry fetch from a different one by returning null here
          null
      }
      // 返回ChunkedByteBuffer
      if (data != null) {
        // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to
        // ChunkedByteBuffer, to go back to old code-path.  Can be removed post Spark 2.4 if
        // new path is stable.
        if (remoteReadNioBufferConversion) {
          return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
        } else {
          return Some(ChunkedByteBuffer.fromManagedBuffer(data))
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

blockTransferService.fetchBlockSync)方法负责根据BlockManagerId读取数据,需要说明的是BlockManagerId不是个字段,是个class,有host,port,executor id等字段

 def fetchBlockSync(
      host: String,
      port: Int,
      execId: String,
      blockId: String,
      tempFileManager: DownloadFileManager): ManagedBuffer = {
    // A monitor for the thread to wait on.
    // 监控等待的线程.
    val result = Promise[ManagedBuffer]()
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          data match {
            case f: FileSegmentManagedBuffer =>
              result.success(f)
            case _ =>
              val ret = ByteBuffer.allocate(data.size.toInt)
              ret.put(data.nioByteBuffer())
              ret.flip()
              result.success(new NioManagedBuffer(ret))
          }
        }
      }, tempFileManager)
    ThreadUtils.awaitResult(result.future, Duration.Inf)
  }

fetchBlocks是抽象方法,实际上调用了NettyBlockTransferService里面的实现

  /**
   * 用于获取远程shuffle文件,实际上是利用NettyBlockTransferService中创建的netty服务。
    *
   */
  override def fetchBlocks(
      host: String,
      port: Int,
      execId: String,
      blockIds: Array[String],
      listener: BlockFetchingListener,
      tempFileManager: DownloadFileManager): Unit = {
    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
    try {
      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
           //通过C/S模式从远程进行通信,来拉去数据。
          val client = clientFactory.createClient(host, port)
           // 一对一读取数据
          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
            transportConf, tempFileManager).start()
        }
      }

      val maxRetries = transportConf.maxIORetries()
      if (maxRetries > 0) {
        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
        // a bug in this code. We should remove the if statement once we're sure of the stability.
        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
      } else {
        blockFetchStarter.createAndStart(blockIds, listener)
      }
    } catch {
      case e: Exception =>
        logError("Exception while beginning fetchBlocks", e)
        blockIds.foreach(listener.onBlockFetchFailure(_, e))
    }
  }

new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,transportConf, tempFileManager).start()方法是用rpc向持有block的executor发送消息

  public void start() {
 

  public void start() {
    if (blockIds.length == 0) {
      throw new IllegalArgumentException("Zero-sized blockIds array");
    }

    client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
      @Override
      public void onSuccess(ByteBuffer response) {
        try {
          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);

          // Immediately request all chunks -- we expect that the total size of the request is
          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
          for (int i = 0; i < streamHandle.numChunks; i++) {
            if (downloadFileManager != null) {
              client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                new DownloadCallback(i));
            } else {
              client.fetchChunk(streamHandle.streamId, i, chunkCallback);
            }
          }
        } catch (Exception e) {
          logger.error("Failed while starting block fetches after success", e);
          failRemainingBlocks(blockIds, e);
        }
      }

      @Override
      public void onFailure(Throwable e) {
        logger.error("Failed while starting block fetches", e);
        failRemainingBlocks(blockIds, e);
      }
    });
  }

消息将由对应Executor的NettyBlockRpcServer中的receive收到,并调用getBlockData方法来读取数据

override def receive(
      client: TransportClient,
      rpcMessage: ByteBuffer,
      responseContext: RpcResponseCallback): Unit = {
    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
    logTrace(s"Received request: $message")

    message match {
      case openBlocks: OpenBlocks =>
        val blocksNum = openBlocks.blockIds.length
        val blocks = for (i <- (0 until blocksNum).view)
          yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
        logTrace(s"Registered streamId $streamId with $blocksNum buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

      case uploadBlock: UploadBlock =>
        // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
        val (level: StorageLevel, classTag: ClassTag[_]) = {
          serializer
            .newInstance()
            .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
            .asInstanceOf[(StorageLevel, ClassTag[_])]
        }
        val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
        val blockId = BlockId(uploadBlock.blockId)
        logDebug(s"Receiving replicated block $blockId with level ${level} " +
          s"from ${client.getSocketAddress}")
        blockManager.putBlockData(blockId, data, level, classTag)
        responseContext.onSuccess(ByteBuffer.allocate(0))
    }
  }

getBlockData方法来读取数据

 override def getBlockData(blockId: BlockId): ManagedBuffer = {
    if (blockId.isShuffle) {
      //先调用的是ShuffleManager的shuffleBlockResolver方法,得到ShuffleBlockResolver
      //调用其IndexShuffleBlockResolver.getBlockData方法
      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
    } else {
      getLocalBytes(blockId) match {
        case Some(blockData) =>
          new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
        case None =>
          // If this block manager receives a request for a block that it doesn't have then it's
          // likely that the master has outdated block statuses for this block. Therefore, we send
          // an RPC so that this block is marked as being unavailable from this block manager.
          reportBlockStatus(blockId, BlockStatus.empty)
          throw new BlockNotFoundException(blockId.toString)
      }
    }
  }

----------------------------------------------------------

上面说了BlockManager管理数据读取的方法,接下去说一下BlockManager管理数据写入

老规矩,用一张图来说明整体流程

spark源码系列(9)BlockManager的原理

入口在BlockManager#doPutIteratord()方法

  /**
   * 根据不同的配置,选择了不同的方法写入数据,然后更新了数据块的状态,然后做了备份的更新
   */
  private def doPutIterator[T](
      blockId: BlockId,
      iterator: () => Iterator[T],
      level: StorageLevel,
      classTag: ClassTag[T],
      tellMaster: Boolean = true,
      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
      val startTimeMs = System.currentTimeMillis
      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
      // Size of the block in bytes
      var size = 0L
      //存储级别是内存
      if (level.useMemory) {
        // Put it in memory first, even if it also has useDisk set to true;
        // We will drop it to disk later if the memory store can't hold it.
        //需要序列化
        if (level.deserialized) {
          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
            case Right(s) =>
              size = s
            case Left(iter) =>
              // Not enough space to unroll this block; drop to disk if applicable
              //空间不够展开块且允许写入磁盘
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
                }
                size = diskStore.getSize(blockId)
              } else {
                //返回错误提示
                iteratorFromFailedMemoryStorePut = Some(iter)
              }
          }
        } else { // !level.deserialized
          //不用序列化
          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
            case Right(s) =>
              size = s
            case Left(partiallySerializedValues) =>
              // Not enough space to unroll this block; drop to disk if applicable
              //空间不够展开块且允许写入磁盘
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  partiallySerializedValues.finishWritingToStream(out)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
              }
          }
        }
      //存储级别是磁盘
      } else if (level.useDisk) {
        diskStore.put(blockId) { channel =>
          val out = Channels.newOutputStream(channel)
          serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
        }
        size = diskStore.getSize(blockId)
      }
      val putBlockStatus = getCurrentBlockStatus(blockId, info)
      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
      if (blockWasSuccessfullyStored) {
        // Now that the block is in either the memory or disk store, tell the master about it.
        info.size = size
        if (tellMaster && info.tellMaster) {
          reportBlockStatus(blockId, putBlockStatus)
        }
        //更新数据块信息
        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
        //备份数目>1
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          val bytesToReplicate = doGetLocalBytes(blockId, info)
          // [SPARK-16550] Erase the typed classTag when using default serialization, since
          // NettyBlockRpcServer crashes when deserializing repl-defined classes.
          // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
            scala.reflect.classTag[Any]
          } else {
            classTag
          }
          try {
            replicate(blockId, bytesToReplicate, level, remoteClassTag)
          } finally {
            bytesToReplicate.dispose()
          }
          logDebug("Put block %s remotely took %s"
            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
        }
      }
      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
      iteratorFromFailedMemoryStorePut
    }
  }

memoryStore.putIteratorAsValues 尝试将当前块作为value保存在内存中

/**
   * Attempt to put the given block in memory store as values.
   * 尝试将当前块作为value保存在内存中
   *  
   *  
   *  有可能iterator太大以至于不能保存到内存中,为了避免OOM,这个方法会逐渐展开iterator并间歇性检查是否有足够的空余内存
   *  如果这个块成功地保存到了内存中,那么这些在保存过程中暂时展开的的内存就成了存储内存,因此我们不会获取多余的内存。
   *  
   * @return in case of success, the estimated size of the stored data. In case of failure, return
   *         an iterator containing the values of the block. The returned iterator will be backed
   *         by the combination of the partially-unrolled block and the remaining elements of the
   *         original input iterator. The caller must either fully consume this iterator or call
   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
   *         block.
   */
  private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

    val valuesHolder = new DeserializedValuesHolder[T](classTag)

    putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
      case Right(storedSize) => Right(storedSize)
      case Left(unrollMemoryUsedByThisBlock) =>
        val unrolledIterator = if (valuesHolder.vector != null) {
          valuesHolder.vector.iterator
        } else {
          valuesHolder.arrayValues.toIterator
        }

        Left(new PartiallyUnrolledIterator(
          this,
          MemoryMode.ON_HEAP,
          unrollMemoryUsedByThisBlock,
          unrolled = unrolledIterator,
          rest = values))
    }
  }
  private def putIterator[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T],
      memoryMode: MemoryMode,
      valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
     // 当前已经展开的数据块
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    // 是否仍有足够的内存来展开数据块
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    // 每个展开线程初始的内存大小
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    // 每隔几次检查是否有足够的空余空间
    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
    // Memory currently reserved by this task for this particular unrolling operation
    // 当前线程保留用来做展开块工作的内存大小
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
   // 内存增长因子,每次请求的内存大小为(memoryGrowthFactor * vector .size())-memoryThreshold
    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
    // Keep track of unroll memory used by this particular block / putIterator() operation
    // 展开这个块使用的内存大小
    var unrollMemoryUsedByThisBlock = 0L

    // Request enough memory to begin unrolling
    // 请求足够的内存做unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    // 安全地展开这个数据库,定期检查剩余内存是否足够
    while (values.hasNext && keepUnrolling) {
      valuesHolder.storeValue(values.next())
       // 每16次检查一次是否超出了分配的内存的大小
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        val currentSize = valuesHolder.estimatedSize()
        // If our vector's size has exceeded the threshold, request more memory
        // 如果不够
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          // 申请内存
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          }
          // New threshold is currentSize * memoryGrowthFactor
          memoryThreshold += amountToRequest
        }
      }
      elementsUnrolled += 1
    }

    // Make sure that we have enough memory to store the block. By this point, it is possible that
    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    // perform one final call to attempt to allocate additional memory if necessary.
     
    if (keepUnrolling) {
      val entryBuilder = valuesHolder.getBuilder()
      val size = entryBuilder.preciseSize
      // 展开所用的内存小于数据块的大小
      if (size > unrollMemoryUsedByThisBlock) {
        val amountToRequest = size - unrollMemoryUsedByThisBlock
        // 获取额外的空间
        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
        if (keepUnrolling) {
          unrollMemoryUsedByThisBlock += amountToRequest
        }
      }
      // 如果成功展开了这个块,估计该块在内存中占的空间的大小
      if (keepUnrolling) {
        val entry = entryBuilder.build()
        // Synchronize so that transfer is atomic
        // 将展开所用的内存转为存储的内存,释放掉展开的空间,然后获取内存用于存放block
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
          val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
          assert(success, "transferring unroll memory to storage memory failed")
        }
        // 如果内存足够,那么写入数据到entry
        entries.synchronized {
          entries.put(blockId, entry)
        }

        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
          Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        Right(entry.size)
      } else {
        // We ran out of space while unrolling the values for this block
        logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
        Left(unrollMemoryUsedByThisBlock)
      }
    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
      Left(unrollMemoryUsedByThisBlock)
    }
  }

diskStore.put(blockId)写入磁盘

  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val out = new CountingWritableChannel(openForWrite(file))
    var threwException: Boolean = true
    try {
      writeFunc(out)
      blockSizes.put(blockId, out.getCount)
      threwException = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: IOException =>
          if (!threwException) {
            threwException = true
            throw ioe
          }
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

回调了参数中的writerFunc,将数据写入到文件

  def finishWritingToStream(os: OutputStream): Unit = {
    verifyNotConsumedAndNotDiscarded()
    consumed = true
    // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
    ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os)
    memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
    redirectableOutputStream.setOutputStream(os)
    while (rest.hasNext) {
     //writeObject方法来将数据写入到流中
      serializationStream.writeObject(rest.next())(classTag)
    }
    serializationStream.close()
  }

--------------------------------------

 

相关标签: 大数据 框架