Spark性能优化指南——初级篇
程序员文章站
2023-02-20 15:06:49
原文来我的公众号:Spark性能优化指南——初级篇 一. Spark作业原理 我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。该进程是向集群管理器(Yarn,K8s)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。 ......
原文来我的公众号:spark性能优化指南——初级篇
一. spark作业原理
我们使用spark-submit提交一个spark作业之后,这个作业就会启动一个对应的driver进程。该进程是向集群管理器(yarn,k8s)申请运行spark作业需要使用的资源,这里的资源指的就是executor进程。
yarn集群管理器会根据我们为spark作业设置的资源参数,在各个工作节点上,启动一定数量的executor进程,每个executor进程都占有一定数量的内存和cpu core。
在申请到了作业执行所需的资源之后,driver进程就会开始调度和执行我们编写的作业代码了。
driver进程会将我们编写的spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个executor进程中执行。
task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。
一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后driver就会调度运行下一个stage。
下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reducebykey、join等),那么就会在该算子处,划分出一个stage界限来。
可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。
因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reducebykey()算子接收的函数)。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到executor进程的内存或者所在节点的磁盘文件中。
因此executor的内存主要分为三块:
第一块是让task执行我们自己编写的代码时使用,默认是占executor总内存的20%;
第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占executor总内存的20%;
第三块是让rdd持久化时使用,默认占executor总内存的60%。
task的执行速度是跟每个executor进程的cpu core数量有直接关系的。一个cpu core同一时间只能执行一个线程。而每个executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。
如果cpu core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
二.核心调优参数
num-executors:
该参数用于设置spark作业总共要用多少个executor进程来执行。driver在向yarn集群管理器申请资源时,yarn集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的executor进程,此时你的spark作业的运行速度是非常慢的。(建议50~100个左右的executor进程)
executor-memory:
该参数用于设置每个executor进程的内存。executor内存的大小,很多时候直接决定了spark作业的性能,而且跟常见的jvm oom异常,也有直接的关联。(根据作业大小不同,建议设置4g~8g,num-executors乘以executor-memory,是不能超过队列的最大内存量的)
executor-cores:
该参数用于设置每个executor进程的cpu core数量。这个参数决定了每个executor进程并行执行task线程的能力。因为每个cpu core同一时间只能执行一个task线程,因此每个executor进程的cpu core数量越多,越能够快速地执行完分配给自己的所有task线程。(建议设置为2~4个,且num-executors * executor-cores不要超过队列总cpu core的1/3~1/2)
driver-memory:
该参数用于设置driver进程的内存(建议设置512m到1g)。
spark.default.parallelism:
该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的spark作业性能。(建议为50~500左右,缺省情况下spark自己根据底层hdfs的block数量来设置task的数量,默认是一个hdfs block对应一个task。spark官网建议设置该参数为num-executors * executor-cores的2~3倍较为合适)
spark.storage.memoryfraction:
该参数用于设置rdd持久化数据在executor内存中能占的比例,默认是0.6(原则上是尽可能保证数据能够全部在内存中,但如果发现作业发生频繁的gc,就该考虑是否调小)
spark.shuffle.memoryfraction:
该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的executor内存的比例,默认是0.2。也就是说,executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。(shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能)
微信扫描二维码,关注我的公众号
我的个人网站:
上一篇: Hadoop_HDFS_02
下一篇: Hadoop_MapReduce_03