Kafka日志模块(七):LogManager周期性定时任务
LogManager总共有以下几个任务,由startup启动。
def startup(): Unit = {
/* Schedule the cleanup task to delete old logs */
if (scheduler != null) {
// 1. 启动 kafka-log-retention 周期性任务,对过期或过大的日志文件执行清理工作
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
// 2. 启动 kafka-log-flusher 周期性任务,对日志文件执行刷盘操作
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
// 3. 启动 kafka-recovery-point-checkpoint 周期性任务,更新 recovery-point-offset-checkpoint 文件
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
// 4. 启动 kafka-log-start-offset-checkpoint 周期性任务,更新 kafka-log-start-offset-checkpoint 文件
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
// 5. 启动 kafka-delete-logs 周期性任务,删除标记为需要被删除的 log 目录
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup()
}
cleanupLogs任务
先看cleanupLogs任务,该方法会遍历所有的 Log 对象,并从两个维度对执行清理工作:一个是时间维度,即保证 Log 对象中所有的 LogSegment 都是有效的,对于过期的 LogSegment 执行删除操作;另外一个是空间维度,既保证 Log 对象不应过大,对于超出的部分会执行删除操作。
def cleanupLogs(): Unit = {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
// clean current logs.
// 遍历处理每个 topic 分区对应的 Log 对象,只有对应 Log 配置了 cleanup.policy=delete 才会执行删除
val deletableLogs = {
if (cleaner != null) {
// prevent cleaner from working on same partitions when changing cleanup policy
cleaner.pauseCleaningForNonCompactedPartitions()
} else {
currentLogs.filter {
case (_, log) => !log.config.compact
}
}
}
try {
deletableLogs.foreach {
case (topicPartition, log) =>
debug(s"Garbage collecting '${log.name}'")
// 遍历删除当前 Log 对象中过期的 LogSegment 对象,并保证 Log 的大小在允许范围内(对应 retention.bytes 配置)
total += log.deleteOldSegments()
val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) {
// clean future logs
debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments()
}
}
} finally {
if (cleaner != null) {
cleaner.resumeCleaning(deletableLogs.map(_._1))
}
}
debug(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
def deleteOldSegments(): Int = {
if (config.delete) {
// 根据时间判断,根据空间判断,根据baseOffset < logStartOffset判断。
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
这三个方法,到最后,都会调用带参数版的 deleteOldSegments 方法
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
deleteSegments(deletable)
}
}
该方法只有两个步骤:使用传入的函数计算哪些日志段对象能够被删除;调用 deleteSegments 方法删除这些日志段。
protected def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
if (segments.isEmpty) {
// 如果当前压根就没有任何日志段对象,直接返回
Seq.empty
} else {
val deletable = ArrayBuffer.empty[LogSegment]
var segmentEntry = segments.firstEntry
// 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
// 1. 测定条件函数predicate = false
// 2. 扫描到包含Log对象高水位值所在的日志段对象
// 3. 最新的日志段对象不包含任何消息
// 最新日志段对象是segments中Key值最大对应的那个日志段,也就是我们常说的Active Segment。完全为空的Active Segment如果被允许删除,后面还要重建它,故代码这里不允许删除大小为空的Active Segment。
// 在遍历过程中,同时不满足以上3个条件的所有日志段都是可以被删除的!
while (segmentEntry != null) {
val segment = segmentEntry.getValue
val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
// 判断segments比当前segmentsEntry大的第一个segments
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
(nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
else
(null, logEndOffset, segment.size == 0)
// 高水位比当前segments的下个segments的baseOffset大。
// 如果是最后一个segments,且大小是0,则不删除
if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
deletable += segment
segmentEntry = nextSegmentEntry
} else {
segmentEntry = null
}
}
deletable
}
}
最后是 deleteSegments 方法,这个方法执行真正的日志段删除操作。
protected def deleteSegments(deletable: Iterable[LogSegment]): Int = {
maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
val numToDelete = deletable.size
if (numToDelete > 0) {
// 不允许删除所有日志段对象。如果一定要做,先创建出一个新的来,然后再把前面N个删掉
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if (segments.size == numToDelete)
roll()
lock synchronized {
// 确保Log对象没有被关闭
checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
removeAndDeleteSegments(deletable, asyncDelete = true)
// 尝试更新日志的Log Start Offset值
// 如果我们删除了日志段对象,很有可能对外可见消息的范围发生了变化,自然要看一下是否需要更新 Log Start Offset 值。这就是 deleteSegments 方法最后要更新 Log Start Offset 值的原因。
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
}
}
numToDelete
}
}
kafka-log-flusher 任务
用于定期对日志文件执行刷盘操作。相关逻辑实现位于 LogManager#flushDirtyLogs 方法中,该方法会遍历处理每个 topic 分区对应的 Log 对象,通过记录在 Log 对象中的上次执行 flush 的时间戳与当前时间对比,如果时间差值超过一定的阈值(对应 flush.ms 配置),则调用 Log#flush 方法执行刷盘操作。
private def flushDirtyLogs(): Unit = {
debug("Checking for dirty logs to flush...")
for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error(s"Error flushing topic ${topicPartition.topic}", e)
}
}
}
kafka-recovery-point-checkpoint 任务
该任务用于定期更新每个 log 目录名下的 recovery-point-offset-checkpoint 文件,具体在checkpointRecoveryPointOffsets 中。
def checkpointLogRecoveryOffsets(): Unit = {
logsByDir.foreach { case (dir, partitionToLogMap) =>
liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
}
}
}
private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
try {
checkpointLogRecoveryOffsetsInDir(dir)
logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
s"file in directory $dir", e)
}
}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
for {
// 获取指定 log 目录对应的 Map[TopicPartition, Log] 集合
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
} {
// 更新对应的 recovery-point-offset-checkpoint 文件
checkpoint.write(partitionToLog.map { case (tp, log) => tp -> log.recoveryPoint })
}
}
kafka-log-start-offset-checkpoint 任务
更新 kafka-log-start-offset-checkpoint 文件
private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
// 取出logStartOffset
checkpoint <- logStartOffsetCheckpoints.get(dir)
} {
try {
val logStartOffsets = partitionToLog.collect {
case (k, log) if log.logStartOffset > log.logSegments.head.baseOffset => k -> log.logStartOffset
}
checkpoint.write(logStartOffsets)
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
}
}
}
kafka-delete-logs 任务
周期检测删除标记为需要被删除的 log 目录
/**
* Delete logs marked for deletion. Delete all logs for which `currentDefaultConfig.fileDeleteDelayMs`
* has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
* considered for deletion in the next iteration of `deleteLogs`. The next iteration will be executed
* after the remaining time for the first log that is not deleted. If there are no more `logsToBeDeleted`,
* `deleteLogs` will be executed after `currentDefaultConfig.fileDeleteDelayMs`.
* 在 LogManager 启动时(对应 LogManager#startup 方法)会注册一个名为 kafka-delete-logs 的周期性任务,
* 该任务会周期性调用 LogManager#deleteLogs 方法对标记为“-delete”的目录执行删除操作。
*/
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
// 如果存在需要删除的目录
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
} finally {
try {
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = nextDelayMs,
unit = TimeUnit.MILLISECONDS)
} catch {
case e: Throwable =>
if (scheduler.isStarted) {
// No errors should occur unless scheduler has been shutdown
error(s"Failed to schedule next delete in kafka-delete-logs thread", e)
}
}
}
}
上一篇: js判断是否为同样的或者相邻的数字