欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Sql Read Parquet Files; Number of Partitions.

程序员文章站 2022-07-14 12:22:19
...

hive metastore 和 parquet 转化的方式通过 spark.sql.hive.convertMetastoreParquet 控制,默认为 true。
如果设置为 true ,会使用 org.apache.spark.sql.execution.FileSourceScanExec ,否则会使用 org.apache.spark.sql.hive.execution.HiveTableScanExec。

HiveTableScanExec

~~通过文件数量,大小进行分区。
eg. 读入一份 2048M 大小的数据,hdfs 块大小设置为 128M
该目录有1000个小文件,则会生成1000个partition。
如果只有1个文件,则会生成 16 个分区。
如果有一个大文件1024M,其余 999 个文件共 1024M,则会生成 1009 个分区。~~

**In fact, each file will correspond to 2 partitions, since in source code:**

 

private[hive]
class HadoopTableReader(
    @transient private val attributes: Seq[Attribute],
    @transient private val partitionKeys: Seq[Attribute],
    @transient private val tableDesc: TableDesc,
    @transient private val sparkSession: SparkSession,
    hadoopConf: Configuration)
  extends TableReader with CastSupport with Logging {

  // Hadoop honors "mapreduce.job.maps" as hint,
  // but will ignore when mapreduce.jobtracker.address is "local".
  // https://hadoop.apache.org/docs/r2.6.5/hadoop-mapreduce-client/hadoop-mapreduce-client-core/
  // mapred-default.xml
  //
  // In order keep consistency with Hive, we will let it be 0 in local mode also.
  private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
    0 // will splitted based on block by default.
  } else {
    math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
      sparkSession.sparkContext.defaultMinPartitions)
  }

The _minSplitsPerRDD is set as sparkSession.sparkContext.defaultMinPartitions in our cluster configuration.

 

FileSourceScanExec

**org.apache.spark.sql.hive.execution.HiveTableScanExec:**
  /**
   * Create an RDD for non-bucketed reads.
   * The bucketed variant of this function is [[createBucketedReadRDD]].
   *
   * @param readFile a function to read each (part of a) file.
   * @param selectedPartitions Hive-style partition that are part of the read.
   * @param fsRelation [[HadoopFsRelation]] associated with the read.
   */
  private def createNonBucketedReadRDD(
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Seq[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
    val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism

    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        val blockLocations = getBlockLocations(file)
        if (fsRelation.fileFormat.isSplitable(
            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
          (0L until file.getLen by maxSplitBytes).map { offset =>
            val remaining = file.getLen - offset
            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
            val hosts = getBlockHosts(blockLocations, offset, size)
            PartitionedFile(
              partition.values, file.getPath.toUri.toString, offset, size, hosts)
          }
        } else {
          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
          Seq(PartitionedFile(
            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
        }
      }
    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L

    /** Close the current partition and move to the next. */
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        val newPartition =
          FilePartition(
            partitions.size,
            currentFiles.toArray.toSeq) // Copy to a new Array.
        partitions += newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }

    // Assign files to partitions using "Next Fit Decreasing"
    splitFiles.foreach { file =>
      if (currentSize + file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
    }
    closePartition()

    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }
 

 

val defaultMaxSplitBytes =fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes:   This is the default max bytes size each partition can have. It can be configured in SparkSession.

val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes: This is a threshold of  byte size for small files, which means when file size is smaller than it, the file will be merged with other files. This threshold helps avoiding opening to much small files. It can also be configure in SparkSession.

val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism: This is the default number of partitions.Notice that in https://blog.csdn.net/weixin_38670967/article/details/88819797  I have explained that spark.default.parallelism = max(所有executor使用的core总数=num-executors * executor-cores, 2).

val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum: Notice that the totalBytes contains the total bytes size of files + openCostInBytes*#Files.  In the case of reading lots of small files nearly the same size as openCostInBytes ,the totalBytes will be nearly twice as the size of the total files.

val bytesPerCore = totalBytes / defaultParallelism:  This is the  bytes size each core should process in average.

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)): This is the most important value, which determin the final partition result.

 

Let's look at  a example:

Suppose that the defaultMaxSplitBytes=128MB, the filesOpenCostInBytes=4MB and  we have total (2.5MB*30 = 75MB) files, and we run this spark app using 20 cores. Thus, maxSplitBytes = Math.min(128MB, Math.max(4MB, (75+30*4)/20=9.75MB)) = 4MB.  Thus the final number of partitions will be around 75/9.75 ~ =8. [Notice that ]

Suppose that the defaultMaxSplitBytes=128MB, the filesOpenCostInBytes=4MB and  we have total (25MB*30 +30*4= 870MB) files, and we run this spark app using 20 cores. Thus, maxSplitBytes = Math.min(128MB, Math.max(4MB, 870/20=43.5MB)) = 37.5MB.  Thus the final number of partitions will be around 75/37.5 ~ =20.

Tip: partitionSize的计算过程简化,实际上会先对读入的每个分区按maxSplitBytes做切割,切割完后如果的小文件如果大小不足maxSplitBytes的,会合并到一个partition,直到大小 > maxSplitBytes。

 

Conclusion: In the maxSplitBytes's computation, bytesPerCore strive for the data size balance between cores,;defaultMaxSplitBytes is the ceil bytes size of each partition; openCostInBytes helps avoid generating lots of partitions with bytes size smaller than it.  In other words, in the case when #cores is very large, the data size balance between cores can not be ensured , thus there may be some cores with no data.