【Hadoop】MapTask运行机制
程序员文章站
2022-06-30 11:46:42
...
MapTask运行机制
MapTask流程
Tips:
- Mr切片是逻辑切分,HDFS的分块是物理切分
- split与block是一对一的关系
- map阶段所有的排序都是针对key进行排序,不会针对value
流程分析:
- TextInputFormat读取文件,并调用getSplits()函数对文件进行逻辑分片,一个split对应一个block
- RecordReader读取一个split,调用一次map函数,并将结果输出到环形缓冲区
- 缓冲区内部对结果进行分区(partition),分区规则是key的hashcode对reducer个数进行取模
- 当缓冲区存储达到80%时,进行一次溢写(spil),在磁盘生成溢写文件,溢写时,每个分区内部按key进行快速排序(sort),如果设置了combiner,此时会对相同key的value进行聚合
- 所有计算完成后,对多个溢写文件按分区进行合并(merge),并对每个分区内按key进行归并排序
- 同时维护一份索引文件,记录各个分区的偏移量,map阶段的结果是分区内有序的
MapTask并行度(map任务的个数)
MapTask并行度决定了Map任务处理的并发度,从而影响到整个job的运行效率
思想:
移动计算比移动数据划算!!!
思考:
map任务是越多越好吗?哪些因素影响map任务的个数?
不是,如果一个文件仅比128M大一点点,也会当作一个split
split的个数
MapTask并行度的决定机制
- 一个split对应一个map任务,一个split对应一个block
- blocksize=splitsize=128M
为什么splitsize等于128M?
思考:
A文件300M,B文件100M,请问会有几个split?
split的计算是按文件逐个划分的,即先计算A文件3个split,再计算B文件1个split,而不是将A、B文件相加再除以128
源码小阅读
切片机制源码
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
* 翻译:生成文件的列表,并将他们装入FileSplits
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//getFormatMinSplitSize() 1 getMinSplitSize(job) 0或1
//minsize 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//maxsize Long类型的最大值
long maxSize = getMaxSplitSize(job);
// generate splits
//Split列表
List<InputSplit> splits = new ArrayList<InputSplit>();
//获取job中文件的状态信息
List<FileStatus> files = listStatus(job);
//遍历每个文件,由此可见split的生成是按单个文件来计算的
for (FileStatus file: files) {
Path path = file.getPath();
//获取文件长度
long length = file.getLen();
//判断文件是否为空
if (length != 0) {
BlockLocation[] blkLocations;
//获取文件block的位置信息
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//判断文件是否可切分
if (isSplitable(job, path)) {
//获取blocksize和splitsize
long blockSize = file.getBlockSize();
//Math.max(minSize, Math.min(maxSize, blockSize));
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
//SPLIT_SLOP=1.1,如果文件大小/splitsize>1.1,则切分一个split
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//如果文件大小/splitsize>1.1,则单独作为一个split
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable 不可切分文件作为一个split
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
大数据学习+V:yp2595809239