spark AE功能控制小文件的输出数量
程序员文章站
2022-07-14 19:50:19
...
场景:
一般我们配置spark.sql.shuffle.partition来控制reduce的个数,但是对于不同的作业,reduce个数是不同的,对于小任务,一方面是影响运行效率,另一方面对于产生的文件数有很大的影响。
描述:
spark自适应执行,可以动态控制shuffle read的分区数,对reduce个数进行动态调整。
命令:
spark.sql.adaptive.enabled | 设置自适应执行 |
spark.sql.adaptive.minNumPostShufflePartitions | reduce阶段最小的分区数 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | reduce阶段每个task最少处理的数据量,默认64M |
spark.sql.adaptive.shuffle.targetPostShuffleRowCount | reduce每个分区处理的行数(这个在spark-2.3已经没有了) |
源码分析:
以spark-2.3为例, ExchangeCoordinator类。如果设置了最小分区数(spark.sql.adaptive.minNumPostShufflePartitions), 就用最小分区数来计算每个分区inputsize的大小。如果没有设置最小分区数,直接使用spark.sql.adaptive.shuffle.targetPostShuffleInputSize来作为每个分区inputsize的大小。
def estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
// If we have mapOutputStatistics.length < numExchange, it is because we do not submit
// a stage when the number of partitions of this dependency is 0.
assert(mapOutputStatistics.length <= numExchanges)
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
// value less than advisoryTargetPostShuffleInputSize as the target input size of
// a post shuffle task.
//如果最小分区数已经设置,每个分区的inputsize的大小= min(总数据量大小/ 最小分区数, 设置的单个分区的inputsize的大小)
val targetPostShuffleInputSize = minNumPostShufflePartitions match {
case Some(numPartitions) =>
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// The max at here is to make sure that when we have an empty table, we
// only have a single post-shuffle partition.
// There is no particular reason that we pick 16. We just need a number to
// prevent maxPostShuffleInputSize from being set to 0.
val maxPostShuffleInputSize =
math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
case None => advisoryTargetPostShuffleInputSize
}
logInfo(
s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
推荐阅读