欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark存储管理之磁盘存储--DiskStore

程序员文章站 2022-07-01 14:49:31
DiskStore 接着上一篇,本篇,我们分析一下实现磁盘存储的功能类DiskStore,这个类相对简单。在正式展开之前,我觉得有必要大概分析一下BlockManager的背景,或者说它的运行环境,运行的作用范围。Blockmanager这个类其实在运行时的每个节点都会有一个实例(包括driver和 ......

diskstore

接着上一篇,本篇,我们分析一下实现磁盘存储的功能类diskstore,这个类相对简单。在正式展开之前,我觉得有必要大概分析一下blockmanager的背景,或者说它的运行环境,运行的作用范围。blockmanager这个类其实在运行时的每个节点都会有一个实例(包括driver和executor进程),因为不论是driver端进行广播变量的创建,还是executor端shuffle过程中写shuffle块,或者是任务运行时结果太大需要通过blockmanager传输,或者是rdd的缓存,其实在每个运行节点上都会通过blockmanager来管理程序内部对于本地的内存和磁盘的读写,所以综上,我想表达的核心意思就是每个进程(driver和executor)都有一blockmanager实例,而这些blockmanager实例是通过blockmanagerid类来进行唯一区分的,blockmanagerid实际上是对进程物理位置的封装。

diskstore.put

首先我们来看一个最常用的写入方法

def put(blockid: blockid)(writefunc: writablebytechannel => unit): unit = {
    // 通过diskblockmanager对象检查这个blockid对应的文件名的文件是否存在
    if (contains(blockid)) {
      throw new illegalstateexception(s"block $blockid is already present in the disk store")
    }
    logdebug(s"attempting to put block $blockid")
    val starttime = system.currenttimemillis
    // 通过diskblockmanager获取一个文件用于写入数据
    val file = diskmanager.getfile(blockid)
    // 用countingwritablechannel包装一下,以便于记录写入的字节数
    val out = new countingwritablechannel(openforwrite(file))
    var threwexception: boolean = true
    try {
      writefunc(out)
      // 关键步骤,记录到内部的map结构中
      blocksizes.put(blockid, out.getcount)
      threwexception = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: ioexception =>
          if (!threwexception) {
            threwexception = true
            throw ioe
          }
      } finally {
         if (threwexception) {
          remove(blockid)
        }
      }
    }
    val finishtime = system.currenttimemillis
    logdebug("block %s stored as %s file on disk in %d ms".format(
      file.getname,
      utils.bytestostring(file.length()),
      finishtime - starttime))
  }

这个方法很简单,没什么好说的,但是调用了一个比较重要的类diskblockmanager,这个类的功能就是对磁盘上的目录和文件进行管理,会在磁盘上按照一定规则创建一些目录和子目录,在分配文件名时也会尽量均匀第分配在这些目录和子目录下。

diskstore.putbytes

这个方法就不说了,简单处理一下直接调用put方法。

  def putbytes(blockid: blockid, bytes: chunkedbytebuffer): unit = {
    put(blockid) { channel =>
      bytes.writefully(channel)
    }
  }

diskstore.getbytes

我们来看一下这个方法,首先通过diskblockmanager获取对应的文件名,然后将其包装成一个blockdata对象,分为加密和不加密两种。

  def getbytes(blockid: blockid): blockdata = {
    val file = diskmanager.getfile(blockid.name)
    val blocksize = getsize(blockid)
  
    securitymanager.getioencryptionkey() match {
      case some(key) =>
        // encrypted blocks cannot be memory mapped; return a special object that does decryption
        // and provides inputstream / fileregion implementations for reading the data.
        new encryptedblockdata(file, blocksize, conf, key)
  
      case _ =>
        // 看一下diskblockdata
        new diskblockdata(minmemorymapbytes, maxmemorymapbytes, file, blocksize)
    }
  }

diskblockdata

这个类作为磁盘文件的包装类,主要功能是提供了几个方便的接口,将磁盘文件中的数据读取出来并生成缓冲对象。
这个类中有两个重要的方法tochunkedbytebuffer和tobytebuffer,tobytebuffer就不说了,调用readablebytechannel.read(bytebuffer dst)方法读取文件数据,我们看一下tochunkedbytebuffer

diskblockdata.tochunkedbytebuffer

这个方法也很简单,在数据量比较大的时候,由于每次申请的内存块大小有限制maxmemorymapbytes,所以需要切分成多个块

  override def tochunkedbytebuffer(allocator: (int) => bytebuffer): chunkedbytebuffer = {
    // utils.trywithresource调用保证在使用完资源后关闭资源
    // 基本等同于java中的try{}finally{}
    utils.trywithresource(open()) { channel =>
      var remaining = blocksize
      val chunks = new listbuffer[bytebuffer]()
      while (remaining > 0) {
        // 这里取剩余大小和maxmemorymapbytes的较小值,
        // 也就是说每次申请的内存块大小不超过maxmemorymapbytes
        val chunksize = math.min(remaining, maxmemorymapbytes)
        val chunk = allocator(chunksize.toint)
        remaining -= chunksize
        javautils.readfully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new chunkedbytebuffer(chunks.toarray)
    }
  }

diskblockmanager

这个类之前也分析过,主要是用来管理spark运行过程中写入的一些临时文件,以及目录的管理。

  • 首先会根据参数配置创建本地目录(可以是逗号分隔的多个目录),参数的优先顺序是:如果是运行在yarn上,则会使用yarn参数local_dirs配置的本地目录;否则获取环境变量spark_local_dirs的值;否则获取spark.local.dir参数的值;最后如果都没有配置,那么就用java系统参数java.io.tmpdir的值作为临时目录。

  • 其次,关于文件在目录之间分配的问题,使用文件名的hash值对目录数量取余的方法来尽量将文件均匀地分配到不同的目录下。

  • 另外一点要说的是文件名的命名规则,是根据不同作用的block来区别命名的,例如rdd缓存写入的block的id就是rddblockid,它的文件名拼接规则是"rdd_" + rddid + "_" + splitindex