Mapreduce源码分析(二):MapTask及LineRecordReader读取文件的工作机制,源码详解
MapTask及LineRecordReader读取文件的工作机制,源码详解
MapTask
当ApplicationMaster运行一个MapTask的时候,MapTask会构造一个NewTrackingRecordReader对象
该对象中有个属性是private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
,创建对象时会调用构造方法
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
//代码1:创建一个RecordReader对象input时会调用NewTrackingRecordReader这个构造方法
//在NewTrackingRecordReader这个构造方法里会调用createRecordReader(split, taskContext)这个方法
//用来指定分隔符,下面给出源码
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
//代码2:调用LineRecordReader 中的initialize初始化切片的开始位置和结束位置,后面有源码
input.initialize(split, mapperContext);
//代码3:初始化后就开始读数据量,请看后面的run方法
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
NewTrackingRecordReader
构造方法,用来创建RecordReader的
//代码1调用的代码
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
//代码1.1:调用InputFormat中的RecordReader方法,实际上调用的时它的子类TextInputFormat中createRecordReader方法
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
TextInputFormatpublic class TextInputFormat extends FileInputFormat<LongWritable, Text>
@Override
//代码1.1调用的代码
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {
//textinputformat.record.delimiter自定义分割符,默认以回车作为一行的结束符
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
//返回一个RecordReader对象
return new LineRecordReader(recordDelimiterBytes);
}
textinputformat.record.delimiter
指的是读取一行的数据的终止符号,即遇到textinputformat.record.delimiter
所包含的字符时,该一行的读取结束。
可以通过Configuration的set()方法来设置自定义的终止符,如果没有设置 textinputformat.record.delimiter
,那么Hadoop就采用以CR,LF或者CRLF作为终止符,这一点可以查看LineReader的readDefaultLine方法 。
在MapTask执行run()方法之前会执行Initialize初始化方法,如下
@Override
//代码2执行的代码
public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.TaskAttemptContext context
) throws IOException, InterruptedException {
long bytesInPrev = getInputBytes(fsStats);
//代码2.1:调用LineRecordReader中的initialize方法
real.initialize(split, context);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
之前返回的RecordReader对象real调用了它自己的initialize方法real.initialize(split, context);
LineRecordReader类
作用:读取行偏移量作为key, 行记录作为valuepublic class LineRecordReader extends RecordReader<LongWritable, Text>
主要属性:
public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength";
private long start; //分片的开始位置
private long pos; //行读取器已经读取的字节数
private long end; //分片的结束位置
private SplitLineReader in; // 行读取流
private FSDataInputStream fileIn; //输入流,关联文件
private Seekable filePosition; //定位位置
private int maxLineLength; //最大行长度
private LongWritable key; //Key偏移量
private Text value; //Value每行文本
private boolean isCompressedInput;
private Decompressor decompressor;
private byte[] recordDelimiterBytes; //分隔符
initialize方法
initialize函数主要是计算分片的始末位置,以及打开想要的输入流以供读取K-V对,输入流另外处理分片经过压缩的情况public void initialize(InputSplit genericSplit, TaskAttemptContext context)
//代码2.1调用的代码,初始化切片的起始和结束位置,pos会做偏移量
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
//将InputSplit类型转为FileSplit 类型
FileSplit split = (FileSplit) genericSplit;
//将hadoop的配置文件给job
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
初始化input后,mapper就开始读取数据量执行mapper.run(mapperContext);
方法
//代码3调用的方法
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
//代码3.1:判断是否有下一行,调用LineRecordReader中的nextKeyValue方法
while (context.nextKeyValue()) {
//调用map方法,并把读取的key和value传入
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
调用LineRecordReader中的nextKeyValue方法
//代码3.1调用的方法,用来读取<K,V>,把pos赋值给key,读取的一行数据赋值给value
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
我们通常会继承Mapper,重写一个map方法,上面的map就调用我们重写的方法
整体思路:
- ApplicationMaster会创建一个MapTask,MapTask创建一个InputFormat对象,默认TextInputFormat,调用它的
createRecordReader(InputSplit split,TaskAttemptContext context)
方法,此方法是用来自定义分隔符的,默认时以回车来作为一行的结束符,返回一个RecordReader对象,默认是LineRecordReader类型 - 对切片初始化,信息保存在创建的LineRecordReader对象上(start,end,pos(行读取器已经读取的字节数)等信息)
- 执行MapTask中的
mapper.run(mapperContext);
方法开始读取数据 - 在run方法中会调用TextInputFormat中的nextKeyValue方法,在此方法中他会给key和value赋值,key是pos信息当作偏移量,value是每一行的文本数据,返回一个boolean值,来判断是否有下一行数据
- 如果有数据的话把<K,V>键值对传给map方法,
- 我们通常会继承Mapper,重写一个map方法,map中的参数为传进来的<K,V>键值对,key:偏移量,value:每一行的数据。