MapReduce原理(3): MapReduce的分片机制 getSplits()方法 源码解析
程序员文章站
2024-03-19 22:41:04
...
1、getSplits()方法在 FileInputFormat.addInputPath(job, path)中
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
//用于记录分片开始的时间,最后会得到一个分片总用时,时间单位是纳秒
StopWatch sw = new StopWatch().start();
//用来计算分片大小
//minSize 就是 1
//maxSize 追到最下面可以发现其实就是long的最大值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
//存放切片对象
List<InputSplit> splits = new ArrayList<InputSplit>();
//得到路径下的所有文件
List<FileStatus> files = listStatus(job);
//遍历得到的文件
for (FileStatus file: files) {
//得到文件路径
Path path = file.getPath();
//获取文件大小
long length = file.getLen();
//如果文件大小不为0的话
if (length != 0) {
//定义块数组,存放块在datanode上的位置
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
//如果这个文件可以分片的话进行分片,zip、视频等不能进行分片
if (isSplitable(job, path)) {
//获取块大小,hadoop1默认是64M hadoop2默认是128M hadoop3默认是256M
long blockSize = file.getBlockSize();
//得到片大小
//--> 最终决定出切片的大小(128M) --> blockSize值
//Math.max(minSize, Math.min(maxSize, blockSize));这是实现
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//获取文件大小
long bytesRemaining = length;
//文件大小/片大小>1.1 开始分片
//例如 文件大小为260M 260/128=2.03>1.1 进入循环开始分片
//132/128 <1.1 不再进行分片,循环结束
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//文件大小 = 原文件大小 - 当前分片大小
//260 -128 = 132 现在文件大小是132 MB
bytesRemaining -= splitSize;
}
//循环结束之后,只要文件大小不等于0 此时也会在切一个片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//为零长度文件创建空主机数组
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// 保存文件数
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
//返回携带着切片文件的集合
return splits;
}