欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

spark源代码学习之ContextCleaner清理器

程序员文章站 2022-06-24 20:26:06
spark源代码学习之ContextCleaner清理器,Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据...

spark源代码学习之ContextCleaner清理器,Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。

初始化

ContextCleaner的初始化是在SparkContext中初始化的,这个功能默认是必须开
启的。

  _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

初始化 的时候主要newle一个清理线程

// 清理线程===》很重要
  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

这个清理线程,主要清理了RDD,shuffle,Broadcast,累加器,检查点等数据

 /** Keep cleaning RDD, shuffle, and broadcast state.
    * 保持一个干净的RDD,shuffle和broadcast状态
    *
    * ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际还是那个只是
    * 调用keepCleanning方法。
    * */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    // 默认一直为真true
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.foreach { ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            // 清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用
            ref.task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

RDD的清理

 /** Perform RDD cleanup.
    * 在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据
    * */
  def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning RDD " + rddId)
      // 被SparkContext的unpersistRDD方法
      sc.unpersistRDD(rddId, blocking)
      listeners.asScala.foreach(_.rddCleaned(rddId))
      logInfo("Cleaned RDD " + rddId)
    } catch {
      case e: Exception => logError("Error cleaning RDD " + rddId, e)
    }
  }

shuffle的清理

/** Perform shuffle cleanup.
    *
    * 清理Shuffle
    * */
  def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning shuffle " + shuffleId)
      // 把mapOutputTrackerMaster跟踪的shuffle数据不注册(具体做了什么,还没处理)
      mapOutputTrackerMaster.unregisterShuffle(shuffleId)
      // 删除shuffle的块数据
      blockManagerMaster.removeShuffle(shuffleId, blocking)
      listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
      logInfo("Cleaned shuffle " + shuffleId)
    } catch {
      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
    }
  }

广播的清理

/** Perform broadcast cleanup.
    * 清除广播
    * */
  def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
    try {
      logDebug(s"Cleaning broadcast $broadcastId")
      // 广播管理器 清除广播
      broadcastManager.unbroadcast(broadcastId, true, blocking)
      listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
      logDebug(s"Cleaned broadcast $broadcastId")
    } catch {
      case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
    }
  }

累加器的清理

/** Perform accumulator cleanup.
    * 清除累加器
    * */
  def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning accumulator " + accId)
      AccumulatorContext.remove(accId)
      listeners.asScala.foreach(_.accumCleaned(accId))
      logInfo("Cleaned accumulator " + accId)
    } catch {
      case e: Exception => logError("Error cleaning accumulator " + accId, e)
    }
  }

检查点的清理

/**
   * Clean up checkpoint files written to a reliable storage.
   * Locally checkpointed files are cleaned up separately through RDD cleanups.
    *
    * 清理记录到可靠存储的检查点文件。
    * 局部检查点文件通过RDD清理被单独清理。
   */
  def doCleanCheckpoint(rddId: Int): Unit = {
    try {
      logDebug("Cleaning rdd checkpoint data " + rddId)
      // 这里直接调用文件系统删除  是本地 就本地删除,是hdfs就hdfs删除
      ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
      listeners.asScala.foreach(_.checkpointCleaned(rddId))
      logInfo("Cleaned rdd checkpoint data " + rddId)
    }
    catch {
      case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
    }
  }
启动方法

在sparkContext中调用启动方法

    _cleaner.foreach(_.start())

这里是启动方法

/** Start the cleaner.
    * 开始清理
    * */
  def start(): Unit = {
    // 设置清理线程为守护进程
    cleaningThread.setDaemon(true)
    // 设置守护进程的名称
    cleaningThread.setName("Spark Context Cleaner")
    // 启动守护进程
    cleaningThread.start()

    // scheduleAtFixedRate 在给定的初始延迟之后,并随后在给定的时间内,创建并执行一个已启用的周期操作
    // periodicGCInterval=30分钟 也就是没=每过30分钟运行一次清理线程清理垃圾
    periodicGCService.scheduleAtFixedRate(new Runnable {
      // 执行系统的垃圾清理
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

这里启动线程 // 启动守护进程 cleaningThread.start(),这里自我感觉一下,因为下面调用System.gc()是清理垃圾,所以这个cleaningThread线程应该是收集那些需要清理的数据,保存它的引用(引用就是一个地址,一个指针,指向要删除的数据),最后调用System.gc()方法,才真正清理。

结束

最后是关闭这个应用的时候,调用Stop()方法

/**
   * Stop the cleaning thread and wait until the thread has finished running its current task.
    * 停止清理线程并等待线程完成其当前任务。
   */
  def stop(): Unit = {
    stopped = true
    // Interrupt the cleaning thread, but wait until the current task has finished before
    // doing so. This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
    // 中断清理线程,但等待当前任务完成后再执行。
    // This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // ,导致其他令人费解的块未发现异常(spark-6132)。
    synchronized {
      // 打断线程
      cleaningThread.interrupt()
    }
    // 设置0 等待这个线程死掉
    cleaningThread.join()
    // 关闭垃圾清理
    periodicGCService.shutdown()
  }