欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce原理(3): MapReduce的分片机制 getSplits()方法 源码解析

程序员文章站 2024-03-19 22:41:04
...

1、getSplits()方法在 FileInputFormat.addInputPath(job, path)中

  /** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
  	//用于记录分片开始的时间,最后会得到一个分片总用时,时间单位是纳秒
    StopWatch sw = new StopWatch().start();
    //用来计算分片大小
    //minSize 就是 1
    //maxSize 追到最下面可以发现其实就是long的最大值
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    //存放切片对象
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //得到路径下的所有文件
    List<FileStatus> files = listStatus(job);
    //遍历得到的文件
    for (FileStatus file: files) {
      //得到文件路径
      Path path = file.getPath();
      //获取文件大小
      long length = file.getLen();
      //如果文件大小不为0的话
      if (length != 0) {
      	//定义块数组,存放块在datanode上的位置
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
		//如果这个文件可以分片的话进行分片,zip、视频等不能进行分片
        if (isSplitable(job, path)) {
          //获取块大小,hadoop1默认是64M  hadoop2默认是128M  hadoop3默认是256M
          long blockSize = file.getBlockSize();
          //得到片大小
          //--> 最终决定出切片的大小(128M) --> blockSize值
      	  //Math.max(minSize, Math.min(maxSize, blockSize));这是实现
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
		  //获取文件大小
          long bytesRemaining = length;
          //文件大小/片大小>1.1 开始分片
          //例如  文件大小为260M  260/128=2.03>1.1 进入循环开始分片
          //132/128 <1.1  不再进行分片,循环结束
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                         blkLocations[blkIndex].getCachedHosts()));
            //文件大小 = 原文件大小 - 当前分片大小
            //260 -128 = 132 现在文件大小是132 MB
            bytesRemaining -= splitSize;
          }
		   //循环结束之后,只要文件大小不等于0 此时也会在切一个片
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //为零长度文件创建空主机数组
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // 保存文件数
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
	//返回携带着切片文件的集合
    return splits;
  }
相关标签: MapReduce 分片