Spark调优---开发调优
开发调优
1:避免创建重复的RDD
场景:对一份数据执行多次算子操作时,只使用一个RDD。rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,结合“3:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
2:尽可能复用同一个RDD
比如一个rdd1为key-value,另一个rdd2是其中的value,又需要分别对这两个进行后续操作,那么这种情况直接就在原始数据上处理就行了。意思是尽量减少无用的rdd操作。
3:对多次使用的RDD进行持久化
Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。
因此对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。
3.1:MEMORY_ONLY:
未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
3.2:MEMORY_AND_DISK:
使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
3.3:MEMORY_ONLY_SER:
基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。更节省内存,避免占用过多内存导致频繁GC。
3.4:MEMORY_AND_DISK_SER:
基本含义同MEMORY_AND_DISK。同上
3.5:DISK_ONLY
使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
3.6:MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等.
后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
综上:
1:只有内存(是否序列化,是否赋值一份)
2:内存加磁盘(是否序列化,是否赋值一份)
3:只有磁盘
如何选择:
尝试1:
内存足够大(放下整个rdd)首选性能最高MEMORY_ONLY
1:因为不进行序列化与反序列化操作,就避免了这部分的性能开销;
2:对RDD后续操作,基于纯内存,不需要从磁盘文件中读取数据,
3:而且不需要复制一份数据副本,并远程传送到其他节点上。
在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。怎么查看rdd占用量?
尝试2:
尝试1时发生了内存溢出,那么尝试使用MEMORY_ONLY_SER级别。
将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,降低了内存占用。多出来序列化与反序列化的开销。如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
尝试3:
纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。
说明RDD的数据量很大。序列化后的数据比较少,可以节省内存和磁盘的空间开销。优先缓存在内存中,不下才会写入磁盘。
注意:
不建议使用DISK_ONLY和后缀为_2的级别:完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
4:尽量避免使用shuffle类算子
最消耗性能就是shuffle过程。就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
Broadcast与map进行join代码示例
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
5:使用map-side预聚合的shuffle操作
一定要使用shuffle操作,无法用map类的算子来替代,尽量使用可以map-side预聚合的算子。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
6:使用高性能的算子
6.1:使用reduceByKey/aggregateByKey替代groupByKey。
6.2:使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
6.3:使用foreachPartitions替代foreach
将RDD数据写MySQL,普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;foreachPartitions对于每个partition,只要创建一个数据库连接即可,执行批量插入操作,性能高。实践发现1万条数据量写MySQL,性能提升30%以上。
6.4:使用filter之后进行coalesce操作
执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),使用coalesce算子,将RDD数据压缩到更少的partition中去。filter之后,RDD的每个partition中都会有很多数据被过滤掉,后续每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。
6.5:使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
7:广播大变量
算子函数中使用外部变量的场景(100M以上的大集合),应该使用Broadcast来提升性能。
默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,每个task都有一个变量副本。变量比较大(比如100M,甚至1G),大量的副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。
广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,Executor中的task执行时共享那份变量副本。可以大大减少变量副本的数量,并减少对Executor内存的占用开销,降低GC的频率。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
8:使用Kryo优化序列化性能
9:优化数据结构
Java中,有三种类型比较耗费内存:
9.1:对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
9.2:字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
9.3:集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
上一篇: 插件
下一篇: Elasticsearch集群部署安装