2022-04-01 20:48:05
_cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start())
初始化 的时候主要newle一个清理线程
// 清理线程===》很重要 private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/** Keep cleaning RDD, shuffle, and broadcast state. * 保持一个干净的RDD,shuffle和broadcast状态 * * ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际还是那个只是 * 调用keepCleanning方法。 * */ private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { // 默认一直为真true while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) // Synchronize here to avoid being interrupted on stop() synchronized { reference.foreach { ref => logDebug("Got cleaning task " + ref.task) referenceBuffer.remove(ref) // 清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用 ref.task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) case CleanAccum(accId) => doCleanupAccum(accId, blocking = blockOnCleanupTasks) case CleanCheckpoint(rddId) => doCleanCheckpoint(rddId) } } } } catch { case ie: InterruptedException if stopped => // ignore case e: Exception => logError("Error in cleaning thread", e) } } }
/** Perform RDD cleanup. * 在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据 * */ def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = { try { logDebug("Cleaning RDD " + rddId) // 被SparkContext的unpersistRDD方法 sc.unpersistRDD(rddId, blocking) listeners.asScala.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { case e: Exception => logError("Error cleaning RDD " + rddId, e) } }
/** Perform shuffle cleanup. * * 清理Shuffle * */ def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { try { logDebug("Cleaning shuffle " + shuffleId) // 把mapOutputTrackerMaster跟踪的shuffle数据不注册(具体做了什么,还没处理) mapOutputTrackerMaster.unregisterShuffle(shuffleId) // 删除shuffle的块数据 blockManagerMaster.removeShuffle(shuffleId, blocking) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logInfo("Cleaned shuffle " + shuffleId) } catch { case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) } }
/** Perform broadcast cleanup. * 清除广播 * */ def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = { try { logDebug(s"Cleaning broadcast $broadcastId") // 广播管理器 清除广播 broadcastManager.unbroadcast(broadcastId, true, blocking) listeners.asScala.foreach(_.broadcastCleaned(broadcastId)) logDebug(s"Cleaned broadcast $broadcastId") } catch { case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) } }
/** Perform accumulator cleanup. * 清除累加器 * */ def doCleanupAccum(accId: Long, blocking: Boolean): Unit = { try { logDebug("Cleaning accumulator " + accId) AccumulatorContext.remove(accId) listeners.asScala.foreach(_.accumCleaned(accId)) logInfo("Cleaned accumulator " + accId) } catch { case e: Exception => logError("Error cleaning accumulator " + accId, e) } }
/** * Clean up checkpoint files written to a reliable storage. * Locally checkpointed files are cleaned up separately through RDD cleanups. * * 清理记录到可靠存储的检查点文件。 * 局部检查点文件通过RDD清理被单独清理。 */ def doCleanCheckpoint(rddId: Int): Unit = { try { logDebug("Cleaning rdd checkpoint data " + rddId) // 这里直接调用文件系统删除 是本地 就本地删除,是hdfs就hdfs删除 ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId) listeners.asScala.foreach(_.checkpointCleaned(rddId)) logInfo("Cleaned rdd checkpoint data " + rddId) } catch { case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e) } }启动方法
/** Start the cleaner. * 开始清理 * */ def start(): Unit = { // 设置清理线程为守护进程 cleaningThread.setDaemon(true) // 设置守护进程的名称 cleaningThread.setName("Spark Context Cleaner") // 启动守护进程 cleaningThread.start() // scheduleAtFixedRate 在给定的初始延迟之后,并随后在给定的时间内,创建并执行一个已启用的周期操作 // periodicGCInterval=30分钟 也就是没=每过30分钟运行一次清理线程清理垃圾 periodicGCService.scheduleAtFixedRate(new Runnable { // 执行系统的垃圾清理 override def run(): Unit = System.gc() }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) }
这里启动线程 // 启动守护进程 cleaningThread.start(),这里自我感觉一下,因为下面调用System.gc()是清理垃圾,所以这个cleaningThread线程应该是收集那些需要清理的数据,保存它的引用(引用就是一个地址,一个指针,指向要删除的数据),最后调用System.gc()方法,才真正清理。
/** * Stop the cleaning thread and wait until the thread has finished running its current task. * 停止清理线程并等待线程完成其当前任务。 */ def stop(): Unit = { stopped = true // Interrupt the cleaning thread, but wait until the current task has finished before // doing so. This guards against the race condition where a cleaning thread may // potentially clean similarly named variables created by a different SparkContext, // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). // 中断清理线程,但等待当前任务完成后再执行。 // This guards against the race condition where a cleaning thread may // potentially clean similarly named variables created by a different SparkContext, // ,导致其他令人费解的块未发现异常(spark-6132)。 synchronized { // 打断线程 cleaningThread.interrupt() } // 设置0 等待这个线程死掉 cleaningThread.join() // 关闭垃圾清理 periodicGCService.shutdown() }