欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark调优 — 开发调优

程序员文章站 2022-06-01 12:45:55
...

发挥分布式并行处理优势,降低代码的耦合度,实现不同部分代码能够并行执行,减少前后依赖的等待时间

  • 避免创建重复RDD,尽可能复用一个RDD
    代码上精简,提升复用率

  • 多次使用的RDD进行持久化

  1. 建议使用MEMORY_AND_DISK_SER 持久化级别
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
持久化级别 含义解释
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
  1. 对于dataframe lineage过长的情况,可以考虑使用rdd缓存替代dataframe缓存
    因为rdd 不会带有schema相关信息,缓存信息较少,先把df转成rdd,然后缓存rdd,之后可以再把rdd转成df进行使用
  • 避免RDD lineage 过于复杂
    拆解transfomaation 复杂、shuffle过多的Job,生成中间rdd,然后persist, 通过如count这样的action算子触发计算后供重复使用

  • 避免shuffle操作

  1. 尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子
  2. 广播小表,实现map端join:1. Sql 语句中直接使用broadcast hint 2. 设置自动广播阈值参数:-- conf “spark.sql.autoBroadcastJoinThreshold=304857600”
  • 使用map-side预聚合的shuffle操作

    如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子
    在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差

  • 使用高性能算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey
  2. 使用mapPartitions替代普通map: 可能会造成内存溢出,慎用,可以参考自定义迭代器的方式处理
  3. 使用foreachPartitions替代foreach
  4. 使用filter之后进行coalesce操作: 避免低压力task过多浪费资源
  5. 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
  • 多表Join 顺序调整
    小表写在左边;
    调整Join顺序,先进行能够过滤掉更多数据的Join操作,进而减少后续shuffle数据量

  • Window窗口函数执行性能优化

    调参spark.sql.windowExec.buffer.spill.threshold : spill到磁盘文件的数据量阈值大小
    假设JVM实际堆内存大小为M(GB),处理的数据条数为n,大小为size(GB),那么threshold需要满足 :

  1. 单个文件不超过executor内存: size * threshold / n < M;
  2. 读取shuffle文件的缓冲区不超过executor内存:n/(1024 * threshold) < M (shuffle read 读取每个文件都需要1M的缓冲区空间,文件总数n/threshold,除以1024即需要的内存G大小)
相关标签: Spark