欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Hadoop】MapTask运行机制

程序员文章站 2022-06-30 11:46:42
...

MapTask运行机制

MapTask流程

【Hadoop】MapTask运行机制

Tips:

  • Mr切片是逻辑切分,HDFS的分块是物理切分
  • split与block是一对一的关系
  • map阶段所有的排序都是针对key进行排序,不会针对value

流程分析:

  1. TextInputFormat读取文件,并调用getSplits()函数对文件进行逻辑分片,一个split对应一个block
  2. RecordReader读取一个split,调用一次map函数,并将结果输出到环形缓冲区
  3. 缓冲区内部对结果进行分区(partition),分区规则是key的hashcode对reducer个数进行取模
  4. 当缓冲区存储达到80%时,进行一次溢写(spil),在磁盘生成溢写文件,溢写时,每个分区内部按key进行快速排序(sort),如果设置了combiner,此时会对相同key的value进行聚合
  5. 所有计算完成后,对多个溢写文件按分区进行合并(merge),并对每个分区内按key进行归并排序
  6. 同时维护一份索引文件,记录各个分区的偏移量,map阶段的结果是分区内有序的

MapTask并行度(map任务的个数)

MapTask并行度决定了Map任务处理的并发度,从而影响到整个job的运行效率
思想:
移动计算比移动数据划算!!!
思考:
map任务是越多越好吗?哪些因素影响map任务的个数?
不是,如果一个文件仅比128M大一点点,也会当作一个split
split的个数


MapTask并行度的决定机制

  1. 一个split对应一个map任务,一个split对应一个block
  2. blocksize=splitsize=128M

为什么splitsize等于128M?
【Hadoop】MapTask运行机制

思考:
A文件300M,B文件100M,请问会有几个split?

split的计算是按文件逐个划分的,即先计算A文件3个split,再计算B文件1个split,而不是将A、B文件相加再除以128

源码小阅读

切片机制源码

  /** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   * 翻译:生成文件的列表,并将他们装入FileSplits
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    //getFormatMinSplitSize() 1 getMinSplitSize(job) 0或1
    //minsize 1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //maxsize Long类型的最大值
    long maxSize = getMaxSplitSize(job);

    // generate splits
    //Split列表
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //获取job中文件的状态信息
    List<FileStatus> files = listStatus(job);
    //遍历每个文件,由此可见split的生成是按单个文件来计算的
    for (FileStatus file: files) {
      Path path = file.getPath();
      //获取文件长度
      long length = file.getLen();
      //判断文件是否为空
      if (length != 0) {
        BlockLocation[] blkLocations;
        //获取文件block的位置信息
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是否可切分
        if (isSplitable(job, path)) {
          //获取blocksize和splitsize
          long blockSize = file.getBlockSize();
          //Math.max(minSize, Math.min(maxSize, blockSize));
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          //SPLIT_SLOP=1.1,如果文件大小/splitsize>1.1,则切分一个split
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          //如果文件大小/splitsize>1.1,则单独作为一个split
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable 不可切分文件作为一个split
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }

大数据学习+V:yp2595809239
【Hadoop】MapTask运行机制