欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark-3.0 application 调度算法解析

程序员文章站 2022-05-22 22:01:30
spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调 ......

spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的application 调度机制。

private def startexecutorsonworkers(): unit = {
// right now this is a very simple fifo scheduler. we keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingapps) {
//如果在 spark-submmit 脚本中,指定了每个executor 多少个 cpu core,
// 则每个executor 分配该个数的 core,
// 否则 默认每个executor 只分配 1 个 cpu core
val coresperexecutor = app.desc.coresperexecutor.getorelse(1)
// if the cores left is less than the coresperexecutor,the cores left will not be allocated
// 当前 app 还需要分配的 core 数 不能 小于 单个 executor 启动 的 cpu core 数
if (app.coresleft >= coresperexecutor) {
// filter out workers that don't have enough resources to launch an executo/*ku*/r
// 过滤出 状态 为 alive,并且还能 发布 executor 的 worker
// 按照剩余的 cpu core 数 倒序
val usableworkers = workers.toarray.filter(_.state == workerstate.alive)
.filter(canlaunchexecutor(_, app.desc))
.sortby(_.coresfree).reverse
if (waitingapps.length == 1 && usableworkers.isempty) {
logwarning(s"app ${app.id} requires more resource than any of workers could have.")
}
    // todo:  默认采用 spreadoutapps  调度算法, 将 application需要的 executor资源 分派到  多个 worker 上去
      val assignedcores = scheduleexecutorsonworkers(app, usableworkers, spreadoutapps)

// now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableworkers.length if assignedcores(pos) > 0) {
allocateworkerresourcetoexecutors(
app, assignedcores(pos), app.desc.coresperexecutor, usableworkers(pos))
}
}
}
}
判断一个 worker 是否可以发布 executor
private def canlaunchexecutor(worker: workerinfo, desc: applicationdescription): boolean = {
canlaunch(
worker,
desc.memoryperexecutormb,
desc.coresperexecutor.getorelse(1),
desc.resourcereqsperexecutor)
}
让我们看一看里面的 canlaunch 方法
private def canlaunch(
worker: workerinfo,
memoryreq: int,
coresreq: int,
resourcerequirements: seq[resourcerequirement])
: boolean = {
// worker 上 空闲的 内存值 要 大于等于 请求的 内存值
val enoughmem = worker.memoryfree >= memoryreq
// worker 上 空闲的 core 数 要 大于等于 请求的 core数
val enoughcores = worker.coresfree >= coresreq
// worker 是否满足 executor 请求的资源
val enoughresources = resourceutils.resourcesmeetrequirements(
worker.resourcesamountfree, resourcerequirements)
enoughmem && enoughcores && enoughresources
}

回到上面的 scheduleexecutorsonworkers
private def scheduleexecutorsonworkers(
app: applicationinfo,
usableworkers: array[workerinfo],
spreadoutapps: boolean): array[int] = {
val coresperexecutor = app.desc.coresperexecutor
val mincoresperexecutor = coresperexecutor.getorelse(1)
// 默认情况下 是 开启 oneexecutorperworker 机制的,也就是默认是在 一个 worker 上 只启动 一个 executor的
// 如果在spark -submit 脚本中设置了coresperexecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor
val oneexecutorperworker = coresperexecutor.isempty
val memoryperexecutor = app.desc.memoryperexecutormb
val resourcereqsperexecutor = app.desc.resourcereqsperexecutor
val numusable = usableworkers.length
val assignedcores = new array[int](numusable) // number of cores to give to each worker
val assignedexecutors = new array[int](numusable) // number of new executors on each worker
var corestoassign = math.min(app.coresleft, usableworkers.map(_.coresfree).sum)

// 判断 worker节点是否能够启动executor
def canlaunchexecutorforapp(pos: int): boolean = {

val keepscheduling = corestoassign >= mincoresperexecutor
val enoughcores = usableworkers(pos).coresfree - assignedcores(pos) >= mincoresperexecutor
val assignedexecutornum = assignedexecutors(pos)

// if we allow multiple executors per worker, then we can always launch new executors.
// otherwise, if there is already an executor on this worker, just give it more cores.

// 如果spark -submit 脚本中设置了coresperexecutor值,
// 并且当前 这个worker 还没有为这个 application 分配 过 executor ,
val launchingnewexecutor = !oneexecutorperworker || assignedexecutornum == 0
// todo: 可以启动新的 executor
if (launchingnewexecutor) {
val assignedmemory = assignedexecutornum * memoryperexecutor
val enoughmemory = usableworkers(pos).memoryfree - assignedmemory >= memoryperexecutor
val assignedresources = resourcereqsperexecutor.map {
req => req.resourcename -> req.amount * assignedexecutornum
}.tomap
val resourcesfree = usableworkers(pos).resourcesamountfree.map {
case (rname, free) => rname -> (free - assignedresources.getorelse(rname, 0))
}
val enoughresources = resourceutils.resourcesmeetrequirements(
resourcesfree, resourcereqsperexecutor)
val underlimit = assignedexecutors.sum + app.executors.size < app.executorlimit
keepscheduling && enoughcores && enoughmemory && enoughresources && underlimit
} else {
// we're adding cores to an existing executor, so no need
// to check memory and executor limits
// todo: 不满足启动新的 executor条件,则 在 老的 executor 上 追加 core 数
keepscheduling && enoughcores
}
}

// keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits

var freeworkers = (0 until numusable).filter(canlaunchexecutorforapp)
while (freeworkers.nonempty) {
freeworkers.foreach { pos =>
var keepscheduling = true
while (keepscheduling && canlaunchexecutorforapp(pos)) {
corestoassign -= mincoresperexecutor
assignedcores(pos) += mincoresperexecutor

// if we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. otherwise, every iteration assigns cores to a new executor.
if (oneexecutorperworker) {
//todo: 如果该worker节点不能启动新的 executor,则在老的executor 上 分配 mincoresperexecutor 个 cpu core(此时该值默认 为 1 )
assignedexecutors(pos) = 1
} else {
//todo: 如果该worker节点可以启动新的 executor,则在新的executor 上 分配 mincoresperexecutor 个 cpu core(此时该值为 spark-submit脚本配置的 coresperexecutor 值)
assignedexecutors(pos) += 1
}

// spreading out an application means spreading out its executors across as
// many workers as possible. if we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// otherwise, just move on to the next worker.
if (spreadoutapps) {
// todo: 这里传入 keepscheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上 再去 分配 core,直到 worker
// todo: 完成一次遍历
keepscheduling = false
}
}
}
freeworkers = freeworkers.filter(canlaunchexecutorforapp)
}
// 返回每个worker节点分配的cpu核数
assignedcores
}

再来分析 allocateworkerresourcetoexecutors
private def allocateworkerresourcetoexecutors(
app: applicationinfo,
assignedcores: int,
coresperexecutor: option[int],
worker: workerinfo): unit = {
// if the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// otherwise, we launch a single executor that grabs all the assignedcores on this worker.
val numexecutors = coresperexecutor.map { assignedcores / _ }.getorelse(1)
val corestoassign = coresperexecutor.getorelse(assignedcores)
for (i <- 1 to numexecutors) {
val allocated = worker.acquireresources(app.desc.resourcereqsperexecutor)
// todo : 当前 这个 application 追加 一次 executor
val exec = app.addexecutor(worker, corestoassign, allocated)
//todo: 给worker 线程 发送 launchexecutor 命令
launchexecutor(worker, exec)
app.state = applicationstate.running
}
}
ok,至此,spark最新版本 spark-3.0的application 调度算法分析完毕!!!