Spark调优 数据本地化调优
1. 数据本地化背景
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码在一起(一个work节点上),那么性能会非常的高。但是如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上,通常来说移动计算比移动数据速度要快
,因为代码很小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。
2. 源代码
package org.apache.spark.scheduler
import org.apache.spark.annotation.DeveloperApispark
@DeveloperApi
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
type TaskLocality = Value
def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
// condition:条件 小于等于 constraint:约束
condition <= constraint
}
}
/**
* 什么是NO_PREF?
* 当Driver应用程序刚刚启动,Driver分配获得的Executor很可能还没有初始化完毕。所以会有一部分任务的本地化级别被设置为NO_PREF,
* 如果是ShuffleRDD,其本地行始终为NO_PREF,对于这两种本地化级别是NO_PREF的情况,在任务分配时会被优先分配到非本地节点执行,
* 达到一定的优化效果。
*
* PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。
*
* NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比
* PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
*
* NO_PREF: 数据从哪里访问都一样快,不需要位置优先
*
* RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
*
* ANY: 数据在非同一机架的网络上,速度最慢
*
*/
3. 图解
假设1是要计算的数据,2是Driver会调度一个task去计算1的数据,正常情况下会生成3这个task,然后在本executor中直接获取数据执行
。但是如果3在一定等待的阈值内一直获取不到资源去运行,此时恰好另外一个executor是有资源可以运行的,那么就会生成4 task,然后4去跨executor获取数据运行
。
这里有两个注意点:
1. 一个是阈值设置可以根据我们的情况设置大点
2. 4 task是跨executor取数据了,速度会比3 task要慢一点
其他情况和这个相同
4.Spark中的数据本地化流程图
即某个task 计算节点与其输入数据的位置关系,下面将要挖掘Spark 的调度系统如何产生这个结果,这一过程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了这一过程也就基本搞懂了 Spark 的 PreferredLocations(位置优先策略)
- 第一步:PROCESS_LOCAL–>TaskScheduler首先根据数据所在的节点发送task,
如果task在Worker1的Executor1中等待了3s(这个3s是spark的默认等待时间,通过spark.locality.wait来设置,可以在SparkConf()中修改),重试了5次,还是无法执行 TaskScheduler会降低数据本地化的级别,从PROCESS_LOCAL降到NODE_LOCAL - 第二步:NODE_LOCAL–>TaskScheduler重新发送task到Worker1中的Executor2中执行, 如果task在Worker1的Executor2中等待了3s,重试了5次,还是无法执行
TaskScheduler会降低数据本地化的级别,从NODE_LOCAL降到RACK_LOCAL - 第三步:RACK_LOCAL–>TaskScheduler重新发送task到Worker2中的Executor1中执行。
- 第四步:当task分配完成之后,task会通过所在Worker的Executor中的 BlockManager来获取数据,如果BlockManager发现自己没有数据,那么它会调用getRemote方法,通过ConnectionManager与原task所在节点的BlockManager中的ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。
4.优化
/**
* 优化:spark.locality.wait
* 用于获取各个本地化级别的等待时间。
*
* 变量名称 描述 默认值
* spark.locality.wait 本地化级别的默认等待时间(全局) 3000
* spark.locality.wait.process 本地进程的等待时间 3000
* spark.locality.wait.node 本地节点的等待时间 3000
* spark.locality.wait.rack 本地机架的等待时间 3000
*
*
* 提示:在运行的任务时间很长而且数量较多的情况下,适当调高这些参数可以显著提高性能。
* 然而当这些参数值都已经超过任务运行的时长时,需要调小这些参数。
*
*/
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3s")
val localityWaitKey = level match {
case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
case _ => null
}
if (localityWaitKey != null) {
conf.getTimeAsMs(localityWaitKey, defaultWait)
} else {
0L
}
}
5.优化的时机
观察日志,spark作业的运行日志
,推荐大家在测试的时候,先用client
模式,在本地就直接可以看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别
。
如果大多都是PROCESS_LOCAL,那就不用调节了
如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长
调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志
看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短
但是注意别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了。
上一篇: 电商项目(二十一)购物车(登陆情况下)
下一篇: 电商项目(十六)--缓存系统