欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark AE功能控制小文件的输出数量

程序员文章站 2022-07-14 19:50:19
...

场景:

一般我们配置spark.sql.shuffle.partition来控制reduce的个数,但是对于不同的作业,reduce个数是不同的,对于小任务,一方面是影响运行效率,另一方面对于产生的文件数有很大的影响。

 

描述:

spark自适应执行,可以动态控制shuffle read的分区数,对reduce个数进行动态调整。

 

命令:

spark.sql.adaptive.enabled 设置自适应执行
spark.sql.adaptive.minNumPostShufflePartitions reduce阶段最小的分区数
spark.sql.adaptive.shuffle.targetPostShuffleInputSize reduce阶段每个task最少处理的数据量,默认64M
spark.sql.adaptive.shuffle.targetPostShuffleRowCount reduce每个分区处理的行数(这个在spark-2.3已经没有了)

 

 

 

 

 

源码分析:

以spark-2.3为例, ExchangeCoordinator类。如果设置了最小分区数(spark.sql.adaptive.minNumPostShufflePartitions), 就用最小分区数来计算每个分区inputsize的大小。如果没有设置最小分区数,直接使用spark.sql.adaptive.shuffle.targetPostShuffleInputSize来作为每个分区inputsize的大小。
def estimatePartitionStartIndices(
      mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
    // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
    // a stage when the number of partitions of this dependency is 0.
    assert(mapOutputStatistics.length <= numExchanges)

    // If minNumPostShufflePartitions is defined, it is possible that we need to use a
    // value less than advisoryTargetPostShuffleInputSize as the target input size of
    // a post shuffle task.
    //如果最小分区数已经设置,每个分区的inputsize的大小= min(总数据量大小/ 最小分区数, 设置的单个分区的inputsize的大小)
    val targetPostShuffleInputSize = minNumPostShufflePartitions match {
      case Some(numPartitions) =>
        val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
        // The max at here is to make sure that when we have an empty table, we
        // only have a single post-shuffle partition.
        // There is no particular reason that we pick 16. We just need a number to
        // prevent maxPostShuffleInputSize from being set to 0.
        val maxPostShuffleInputSize =
          math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
        math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)

      case None => advisoryTargetPostShuffleInputSize
    }

    logInfo(
      s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
      s"targetPostShuffleInputSize $targetPostShuffleInputSize.")