- spark.memory.fraction,默认值0.6,这个参数控制spark内存管理器管理的内存占内存存的比例(准确地说是:堆内存-300m,300m是为永久代预留),也就是说执行内存和存储内存加起来只有(堆内存-300m)的0.6,剩余的0.4是用于用户代码执行过程中的内存占用,比如你的代码中可能会加载一些较大的文件到内存中,或者做一些排序,用户代码使用的内存并不受内存管理器管理,所以需要预留一定的比例。
- spark.memory.storagefraction,默认值0.5,顾名思义,这个值决定了存储内存的占比,注意是占内存管理器管理的那部分内存的比例,剩余的部分用作执行内存。例如,默认情况下,存储内存占堆内存的比例是0.6 * 0.5 = 0.3(当然准确地说是占堆内存-300m的比例)。
maxonheapstoragememory maxoffheapstoragememory setmemorystore acquirestoragememory acquireunrollmemory acquireexecutionmemory releaseexecutionmemory releaseallexecutionmemoryfortask releasestoragememory releaseallstoragememory releaseunrollmemory executionmemoryused storagememoryused getexecutionmemoryusagefortask
protected val onheapstoragememorypool = new storagememorypool(this, memorymode.on_heap) protected val offheapstoragememorypool = new storagememorypool(this, memorymode.off_heap) protected val onheapexecutionmemorypool = new executionmemorypool(this, memorymode.on_heap) protected val offheapexecutionmemorypool = new executionmemorypool(this, memorymode.off_heap)
onheapstoragememorypool.incrementpoolsize(onheapstoragememory) onheapexecutionmemorypool.incrementpoolsize(onheapexecutionmemory) offheapexecutionmemorypool.incrementpoolsize(maxoffheapmemory - offheapstoragememory) offheapstoragememorypool.incrementpoolsize(offheapstoragememory)
private[memory] def releaseexecutionmemory( numbytes: long, taskattemptid: long, memorymode: memorymode): unit = synchronized { memorymode match { case memorymode.on_heap => onheapexecutionmemorypool.releasememory(numbytes, taskattemptid) case memorymode.off_heap => offheapexecutionmemorypool.releasememory(numbytes, taskattemptid) } }
def releasememory(numbytes: long, taskattemptid: long): unit = lock.synchronized { // 从内部的簿记量中获取该任务使用的内存 val curmem = memoryfortask.getorelse(taskattemptid, 0l) // 检查要释放的内存是否超过了该任务实际使用的内存,并打印告警日志 var memorytofree = if (curmem < numbytes) { logwarning( s"internal error: release called on $numbytes bytes but task only has $curmem bytes " + s"of memory from the $poolname pool") curmem } else { numbytes } if (memoryfortask.contains(taskattemptid)) { // 更新簿记量 memoryfortask(taskattemptid) -= memorytofree // 如果该任务的内存使用量小于等于0,那么从簿记量中移除该任务 if (memoryfortask(taskattemptid) <= 0) { memoryfortask.remove(taskattemptid) } } // 最后通知其他等待的线程 // 因为可能会有其他的任务在等待获取执行内存 lock.notifyall() // notify waiters in acquirememory() that memory has been freed }
private[memory] def releaseallexecutionmemoryfortask(taskattemptid: long): long = synchronized { onheapexecutionmemorypool.releaseallmemoryfortask(taskattemptid) + offheapexecutionmemorypool.releaseallmemoryfortask(taskattemptid) }
def releasestoragememory(numbytes: long, memorymode: memorymode): unit = synchronized { memorymode match { case memorymode.on_heap => onheapstoragememorypool.releasememory(numbytes) case memorymode.off_heap => offheapstoragememorypool.releasememory(numbytes) } }
final def releaseunrollmemory(numbytes: long, memorymode: memorymode): unit = synchronized { releasestoragememory(numbytes, memorymode) }
override private[memory] def acquireexecutionmemory( numbytes: long, taskattemptid: long, memorymode: memorymode): long = synchronized { // 检查内存大小是否正确 assertinvariants() assert(numbytes >= 0) // 根据堆内存还是直接内存决定使用不同的内存池和内存大小 val (executionpool, storagepool, storageregionsize, maxmemory) = memorymode match { case memorymode.on_heap => ( onheapexecutionmemorypool, onheapstoragememorypool, onheapstorageregionsize, maxheapmemory) case memorymode.off_heap => ( offheapexecutionmemorypool, offheapstoragememorypool, offheapstoragememory, maxoffheapmemory) } /** * grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * when acquiring memory for a task, the execution pool may need to make multiple * attempts. each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. this is called once per attempt. */ // 通过挤占存储内存来扩张执行内存, // 通过将缓存的块溢写到磁盘上,从而为执行内存腾出空间 def maybegrowexecutionpool(extramemoryneeded: long): unit = { if (extramemoryneeded > 0) { // there is not enough free memory in the execution pool, so try to reclaim memory from // storage. we can reclaim any free memory from the storage pool. if the storage pool // has grown to become larger than `storageregionsize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. // 我们可以将剩余的存储内存都借过来用作执行内存 // 另外,如果存储内存向执行内存借用了一部分内存,也就是说此时存储内存的实际大小大于配置的值 // 那么我们就将所有的借用的存储内存都还回来 val memoryreclaimablefromstorage = math.max( storagepool.memoryfree, storagepool.poolsize - storageregionsize) if (memoryreclaimablefromstorage > 0) { // only reclaim as much space as is necessary and available: // 只腾出必要大小的内存空间,这个方法会将内存中的block挤到磁盘中 val spacetoreclaim = storagepool.freespacetoshrinkpool( math.min(extramemoryneeded, memoryreclaimablefromstorage)) // 更新一些簿记量,存储内存少了这么多内存,相应的执行内存增加了这么多内存 storagepool.decrementpoolsize(spacetoreclaim) executionpool.incrementpoolsize(spacetoreclaim) } } } /** * the size the execution pool would have after evicting storage memory. * * the execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. it is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. otherwise we may hit spark-12155. * * additionally, this quantity should be kept below `maxmemory` to arbitrate fairness * in execution memory allocation across tasks, otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computemaxexecutionpoolsize(): long = { maxmemory - math.min(storagepool.memoryused, storageregionsize) } executionpool.acquirememory( numbytes, taskattemptid, maybegrowexecutionpool, () => computemaxexecutionpoolsize) }
def freespacetoshrinkpool(spacetofree: long): long = lock.synchronized { val spacefreedbyreleasingunusedmemory = math.min(spacetofree, memoryfree) val remainingspacetofree = spacetofree - spacefreedbyreleasingunusedmemory if (remainingspacetofree > 0) { // if reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val spacefreedbyeviction = memorystore.evictblockstofreespace(none, remainingspacetofree, memorymode) // when a block is released, blockmanager.dropfrommemory() calls releasememory(), so we do // not need to decrement _memoryused here. however, we do need to decrement the pool size. spacefreedbyreleasingunusedmemory + spacefreedbyeviction } else { spacefreedbyreleasingunusedmemory } }
private[spark] def evictblockstofreespace( blockid: option[blockid], space: long, memorymode: memorymode): long = { assert(space > 0) memorymanager.synchronized { var freedmemory = 0l val rddtoadd = blockid.flatmap(getrddid) val selectedblocks = new arraybuffer[blockid] def blockisevictable(blockid: blockid, entry: memoryentry[_]): boolean = { entry.memorymode == memorymode && (rddtoadd.isempty || rddtoadd != getrddid(blockid)) } // this is synchronized to ensure that the set of entries is not changed // (because of getvalue or getbytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entryset().iterator() while (freedmemory < space && iterator.hasnext) { val pair = val blockid = pair.getkey val entry = pair.getvalue if (blockisevictable(blockid, entry)) { // we don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. we perform a // non-blocking "trylock" here in order to ignore blocks which are locked for reading: // 这里之所以要获取写锁是为了防止在块正在被读取或写入的时候将其挤出去 if (blockinfomanager.lockforwriting(blockid, blocking = false).isdefined) { selectedblocks += blockid freedmemory += pair.getvalue.size } } } } def dropblock[t](blockid: blockid, entry: memoryentry[t]): unit = { val data = entry match { case deserializedmemoryentry(values, _, _) => left(values) case serializedmemoryentry(buffer, _, _) => right(buffer) } // 这里的调用将块挤出内存,如果允许写到磁盘则溢写到磁盘上 // 注意blockevictionhandler的实现类就是blockmanager val neweffectivestoragelevel = blockevictionhandler.dropfrommemory(blockid, () => data)(entry.classtag) if (neweffectivestoragelevel.isvalid) { // the block is still present in at least one store, so release the lock // but don't delete the block info // 因为前面获取了这些块的写锁,还没有释放, // 所以在这里释放这些块的写锁 blockinfomanager.unlock(blockid) } else { // the block isn't present in any store, so delete the block info so that the // block can be stored again // 因为块由于从内存中移除又没有写到磁盘上,所以直接从内部的簿记量中移除该块的信息 blockinfomanager.removeblock(blockid) } } // 如果腾出的内存足够多,比申请的量要大,这时才会真正释放相应的块 if (freedmemory >= space) { var lastsuccessfulblock = -1 try { loginfo(s"${selectedblocks.size} blocks selected for dropping " + s"(${utils.bytestostring(freedmemory)} bytes)") (0 until selectedblocks.size).foreach { idx => val blockid = selectedblocks(idx) val entry = entries.synchronized { entries.get(blockid) } // this should never be null as only one task should be dropping // blocks and removing entries. however the check is still here for // future safety. if (entry != null) { dropblock(blockid, entry) // 这时为测试留的一个钩子方法 afterdropaction(blockid) } lastsuccessfulblock = idx } loginfo(s"after dropping ${selectedblocks.size} blocks, " + s"free memory is ${utils.bytestostring(maxmemory - blocksmemoryused)}") freedmemory } finally { // like blockmanager.doput, we use a finally rather than a catch to avoid having to deal // with interruptedexception // 如果不是所有的块都转移成功,那么必然有的块的写锁可能没有释放 // 所以在这里将这些没有移除成功的块的写锁释放掉 if (lastsuccessfulblock != selectedblocks.size - 1) { // the blocks we didn't process successfully are still locked, so we have to unlock them (lastsuccessfulblock + 1 until selectedblocks.size).foreach { idx => val blockid = selectedblocks(idx) blockinfomanager.unlock(blockid) } } } } else {// 如果不能腾出足够多的内存,那么取消这次行动,释放所有已经持有的块的写锁 blockid.foreach { id => loginfo(s"will not store $id") } selectedblocks.foreach { id => blockinfomanager.unlock(id) } 0l } } }
- 如果存储级别允许存到磁盘,那么先溢写到磁盘上
- 将block从memorystore内部的map结构中移除掉
- 向driver上的blockmanagermaster汇报块更新
- 向任务度量系统汇报块更新的统计信息
所以,七绕八绕,饶了这么一大圈,其实所谓的内存挤占,其实就是把引用设为null ^_^当然肯定不是这么简单啦,其实在整个分析的过程中我们也能发现,所谓的内存管理大部分工作就是对任务使用内存一些簿记量的管理维护,这里面有一些比较复杂的逻辑,例如给每个任务分配多少内存的计算逻辑就比较复杂。
private[storage] override def dropfrommemory[t: classtag]( blockid: blockid, data: () => either[array[t], chunkedbytebuffer]): storagelevel = { loginfo(s"dropping block $blockid from memory") val info = blockinfomanager.assertblockislockedforwriting(blockid) var blockisupdated = false val level = info.level // drop to disk, if storage level requires // 如果存储级别允许存到磁盘,那么先溢写到磁盘上 if (level.usedisk && !diskstore.contains(blockid)) { loginfo(s"writing block $blockid to disk") data() match { case left(elements) => diskstore.put(blockid) { channel => val out = channels.newoutputstream(channel) serializermanager.dataserializestream( blockid, out, elements.toiterator)(info.classtag.asinstanceof[classtag[t]]) } case right(bytes) => diskstore.putbytes(blockid, bytes) } blockisupdated = true } // actually drop from memory store val droppedmemorysize = if (memorystore.contains(blockid)) memorystore.getsize(blockid) else 0l val blockisremoved = memorystore.remove(blockid) if (blockisremoved) { blockisupdated = true } else { logwarning(s"block $blockid could not be dropped from memory as it does not exist") } val status = getcurrentblockstatus(blockid, info) if (info.tellmaster) { reportblockstatus(blockid, status, droppedmemorysize) } // 向任务度量系统汇报块更新的统计信息 if (blockisupdated) { addupdatedblockstatustotaskmetrics(blockid, status) } status.storagelevel }
其中,存储内存向执行内存借用 的逻辑相对简单,仅仅是将两个内存池的大小改一下,执行内存池减少一定的大小,存储内存池则增加相应的大小。
override def acquirestoragememory( blockid: blockid, numbytes: long, memorymode: memorymode): boolean = synchronized { assertinvariants() assert(numbytes >= 0) val (executionpool, storagepool, maxmemory) = memorymode match { case memorymode.on_heap => ( onheapexecutionmemorypool, onheapstoragememorypool, maxonheapstoragememory) case memorymode.off_heap => ( offheapexecutionmemorypool, offheapstoragememorypool, maxoffheapstoragememory) } // 因为执行内存挤占不了,所以这里如果申请的内存超过现在可用的内存,那么就申请不了了 if (numbytes > maxmemory) { // fail fast if the block simply won't fit loginfo(s"will not store $blockid as the required space ($numbytes bytes) exceeds our " + s"memory limit ($maxmemory bytes)") return false } // 如果大于存储内存的可用内存,那么就需要向执行内存借用一部分内存 if (numbytes > storagepool.memoryfree) { // there is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. val memoryborrowedfromexecution = math.min(executionpool.memoryfree, numbytes - storagepool.memoryfree) // 存储内存向执行内存借用的逻辑很简单, // 仅仅是将两个内存池的大小改一下, // 执行内存池减少一定的大小,存储内存池则增加相应的大小 executionpool.decrementpoolsize(memoryborrowedfromexecution) storagepool.incrementpoolsize(memoryborrowedfromexecution) } // 通过storagepool申请一定量的内存 storagepool.acquirememory(blockid, numbytes) }
def acquirememory( blockid: blockid, numbytestoacquire: long, numbytestofree: long): boolean = lock.synchronized { assert(numbytestoacquire >= 0) assert(numbytestofree >= 0) assert(memoryused <= poolsize) // 首先调用memorystore的相关方法挤出一些块以释放内存 if (numbytestofree > 0) { memorystore.evictblockstofreespace(some(blockid), numbytestofree, memorymode) } // note: if the memory store evicts blocks, then those evictions will synchronously call // back into this storagememorypool in order to free memory. therefore, these variables // should have been updated. // 因为前面挤出一些块后释放内存时,blockmanager会通过memorymanager相关方法更新内部的簿记量, // 所以这里的memoryfree就会变化,会变大 val enoughmemory = numbytestoacquire <= memoryfree if (enoughmemory) { _memoryused += numbytestoacquire } enoughmemory }
override def acquireunrollmemory( blockid: blockid, numbytes: long, memorymode: memorymode): boolean = synchronized { acquirestoragememory(blockid, numbytes, memorymode) }
上一篇: 大数据之Zookeeper概述
下一篇: cookie清除及其他操作
netty源码解解析(4.0)-23 ByteBuf内存管理:分配和释放
16.Spark Streaming源码解读之数据清理机制解析 sparkSpark Streaming源码解析RDD数据清理
Spark 源码解析 : DAGScheduler中的DAG划分与提交
Spark 源码解析 : DAGScheduler中的DAG划分与提交
6.Spark streaming技术内幕 : Job动态生成原理与源码解析
16.Spark Streaming源码解读之数据清理机制解析 sparkSpark Streaming源码解析RDD数据清理
6.Spark streaming技术内幕 : Job动态生成原理与源码解析
Spark源码分析 集群架构介绍和SparkContext源码解析
Spark DAGSchduler stage划分原理与源码解析