欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kafka logManager类 kafka存储机制

程序员文章站 2022-07-14 14:12:07
...

  logManager类:管理kafka数据log的类,包括数据clean,flush等操作

   Log类:每个tplog的对象

      logSegment:每个tplog目录下的文件对象

          filemessageSet:每个log file的管道类

          base offset:在topic中的绝对offset值

          offsetindex:每个log index的管道map类,存储相对offset值和文件position

 

   按照partition分区topic,分发到各个机子上

   partition上有多个log文件,每个log文件一个索引文件

   log文件是实际的数据,索引文件是log文件里数据的相对偏移量和在log文件里的position,偏移量offset是一段数据生成一个offset,避免offset文件过大

 

1.初始化:

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LockFile = ".lock"
  val InitialTaskDelayMs = 30*1000
  private val logCreationOrDeletionLock = new Object
  private val logs = new Pool[TopicAndPartition, Log]()//所有log的对象,一个topicpartition 一个log对象

  //获得log文件,并获得文件channel锁
  createAndValidateLogDirs(logDirs)
  private val dirLocks = lockLogDirs(logDirs)
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  //遍历所有的log,生成Log对象,并且执行log clean(checkposition)
  loadLogs()

主要方法loadLogs:

if (cleanShutdownFile.exists) {//表示上次关闭kafka时,已经clean完,这次不需要clean
        debug(
          "Found clean shutdown file. " +
          "Skipping recovery for all logs in data directory: " +
          dir.getAbsolutePath)
      } else {
        // log recovery itself is being performed by `Log` class during initialization
        brokerState.newState(RecoveringFromUncleanShutdown)
      }

      //获得log下recover文件
      val recoveryPoints = this.recoveryPointCheckpoints(dir).read

      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        Utils.runnable {
          debug("Loading log '" + logDir.getName + "'")
          //从文件目录上获得topic和partition
          val topicPartition = Log.parseTopicPartitionName(logDir.getName)
          //从map中获得topic的自定义config,如果
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
          val previous = this.logs.put(topicPartition, current)
          //判断是否有重复的topic+partition
          if (previous != null) {
            throw new IllegalArgumentException(
              "Duplicate log directories found: %s, %s!".format(
              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
      }
      //对每个logDir执行 上边的runnable,生成Log对象添加到log pool中
      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq

  其中new Log方法,为初始化log file和index

 主方法:loadSegments

   1.处理swap文件,log则重新加载(rename),index则删除

   2.加载log和index,恢复不存在的index

private def loadSegments() {
    // create the log directory if it doesn't exist
    dir.mkdirs()
    
    // first do a pass through the files in the log directory and remove any temporary files 
    // and complete any interrupted swap operations
    for(file <- dir.listFiles if file.isFile) {
      if(!file.canRead)
        throw new IOException("Could not read file " + file)
      val filename = file.getName
      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
        // if the file ends in .deleted or .cleaned, delete it
        file.delete()
      } else if(filename.endsWith(SwapFileSuffix)) {//文件用于swap时候,恢复log
        // we crashed in the middle of a swap operation, to recover:
        // if a log, swap it in and delete the .index file
        // if an index just delete it, it will be rebuilt
        //如果是index则删除,如果是log则重新加载(重命名),并删除已经存在的index
        val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
        if(baseName.getPath.endsWith(IndexFileSuffix)) {
          file.delete()
        } else if(baseName.getPath.endsWith(LogFileSuffix)){
          // delete the index
          val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
          index.delete()
          // complete the swap operation
          val renamed = file.renameTo(baseName)
          if(renamed)
            info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
          else
            throw new KafkaException("Failed to rename file %s.".format(file.getPath))
        }
      }
    }

    // now do a second pass and load all the .log and .index files
    for(file <- dir.listFiles if file.isFile) {
      val filename = file.getName
      if(filename.endsWith(IndexFileSuffix)) {
        // if it is an index file, make sure it has a corresponding .log file 查看index log是否对应的 log,如果没有则删除
        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
        if(!logFile.exists) {
          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
          file.delete()
        }
      } else if(filename.endsWith(LogFileSuffix)) {
        // if its a log file, load the corresponding log segment
        // 文件名是start offset
        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
        val hasIndex = Log.indexFilename(dir, start).exists
        //建立tplog中 每个日志文件对象 logsegment,包含filemessage,offsetindex,baseoffset值
        val segment = new LogSegment(dir = dir, 
                                     startOffset = start,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time)
        if(!hasIndex) {
          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
          //重建index文件和内存索引,文件和内存索引是用的channel map机制
          segment.recover(config.maxMessageSize)
        }
        segments.put(start, segment)
      }
    }

    if(logSegments.size == 0) {
      // no existing segments, create a new mutable segment beginning at offset 0
      segments.put(0L, new LogSegment(dir = dir,
                                     startOffset = 0,
                                     indexIntervalBytes = config.indexInterval, 
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time))
    } else {
      recoverLog()
      // reset the index size of the currently active log segment to allow more entries
      activeSegment.index.resize(config.maxIndexSize)
    }

    // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment
    for (s <- logSegments)
      s.index.sanityCheck()
  }

 -----------------------------初始化完毕---------------------------------

 

startup方法中三个功能:

1.cleanupLogs

2.flushDirtyLogs

3.checkpointRecoveryPointOffsets

 

1.cleanupLogs

 两个方法一个是超时(超时是modify时间),一个是大小(大小是最老的小于diff)

  private def cleanupExpiredSegments(log: Log): Int = {
    val startMs = time.milliseconds
    //参数为log manager开始时间-tplog的修改时间 和 配置retention时间 比较,超过则需要删除,返回true
    //删除的是最后一次修改时间超过retention time的
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }
  /**
   * 删除规则,是tplog超过阈值,从最老的开始找,找到file的大小小于diff的时候删除
   * 如果当前log file大小大于diff,则停止(原则是等最后一个文件可删除)
   *  Runs through the log removing segments until the size of the log
   *  is at least logRetentionSize bytes in size
   */
  private def cleanupSegmentsToMaintainSize(log: Log): Int = {
    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
      return 0//当配置小于0,或log大小小于配置
    var diff = log.size - log.config.retentionSize
    def shouldDelete(segment: LogSegment) = {
      if(diff - segment.size >= 0) {//如果需要删除的大小 大于或等于 logfile,则返回true
        diff -= segment.size
        true
      } else {
        false
      }
    }
    log.deleteOldSegments(shouldDelete)
  }

  参数:

  清理日志,距离上次修改时间大于config时间,则删除

  val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))

  log clean参数,达到log大小上限,log的position

  val logRetentionBytes = props.getLong("log.retention.bytes", -1)

 

  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
    // find any segments that match the user-supplied predicate UNLESS it is the final segment 
    // and it is empty (since we would just end up re-creating it
    val lastSegment = activeSegment
    //超时,并且包含segment,则删除,获得删除list segment
    val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
    val numToDelete = deletable.size
    if(numToDelete > 0) {
      lock synchronized {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        if(segments.size == numToDelete)
          roll()
        // remove the segments for lookups
        deletable.foreach(deleteSegment(_))//从segment集合中移除,修改文件名称为delete结尾,并异步删除
      }
    }
    numToDelete
  }

 

 2.flushDirtyLogs

flush的message条数和时间间隔
    /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
  val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)
  
  /**
   * Flush any log which has exceeded its flush interval and has unwritten messages.
   */
  private def flushDirtyLogs() = {
    debug("Checking for dirty logs to flush...")

    for ((topicAndPartition, log) <- logs) {
      try {
        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + log.config.flushMs +
              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
        if(timeSinceLastFlush >= log.config.flushMs)
          log.flush
      } catch {
        case e: Throwable =>
          error("Error flushing topic " + topicAndPartition.topic, e)
      }
    }
  }
  
    @threadsafe
  def flush() {
    LogFlushStats.logFlushTimer.time {
      log.flush()
      index.flush()
    }
  }

 

3.checkpointRecoveryPointOffsets

checkpointRecoveryPointOffsets,标记logdir上的恢复点,避免启动时,需要恢复所有log,生成index

 

是按照logdir遍历,logdir中包含多个tplog

 

  /**
   * Make a checkpoint for all logs in provided directory.
   */
  private def checkpointLogsInDir(dir: File): Unit = {
    //获得当前dir的所有tplog,value:Map【TopicAndPartition, Log】
    val recoveryPoints = this.logsByDir.get(dir.toString)
    if (recoveryPoints.isDefined) {
      //mapValues重新生成map的value,write参数(topicAndPartition:recoverPoint);
      //write将tplog的offset写入recover文件的tmp文件中,删除旧文件,rename为recover文件 _是Log对象(value)
      this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
    }
  }

 

 logmanager里实现log compact功能

    if(cleanerConfig.enableCleaner)
      cleaner.startup()//log compact