Hadoop--MapReduce6--控制输入输出格式
程序员文章站
2022-07-14 16:42:43
...
在mapreduce处理过程中,map阶段处理数据,得到一系列key-value,然后由reduce将相同key进行聚合,maptask任务会分布在不同机器上执行,输出结果保存在运行机器上,reducetask首先需要将map输出文件下载本地运行机器,因此map以及reduce阶段输出文件的格式很重要,使用简洁格式可以大大减少数据的传输量。
在提交任务运行时,可以通过Job对象setOutputFormatClass来实现:
/**
* Set the {@link OutputFormat} for the job.
* @param cls the <code>OutputFormat</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setOutputFormatClass(Class<? extends OutputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
OutputFormat.class);
}
可以设定的格式是一个抽象类的继承类来实现
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {
/**
* Get the {@link RecordWriter} for the given task.
*
* @param context the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* Check for validity of the output-specification for the job.
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param context information about the job
* @throws IOException when output should not be attempted
*/
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
* @param context the task context
* @return an output committer
* @throws IOException
* @throws InterruptedException
*/
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException;
}
可以继承抽象类自定义格式,hadoop提供的格式有:
其中TextOutputFormat是默认的格式,下面来测试一下SequenceFileOutputFormat文件格式:
将倒排索引程序第一步job提交时设置:
job.setOutputFormatClass(SequenceFileOutputFormat.class);
public class IndexStepOne {
public static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String fileName = null;
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
fileName = inputSplit.getPath().getName();
}
// 产生 <hello-文件名,1>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String w : words) {
// 将"单词-文件名"作为key,1作为value,输出
context.write(new Text(w + "-" + fileName), new IntWritable(1));
}
}
}
public static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(IndexStepOne.class);
job.setMapperClass(IndexStepOneMapper.class);
job.setReducerClass(IndexStepOneReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\index\\input"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out3"));
job.waitForCompletion(true);
}
}
最终reduce输出的文件
SEQorg.apache.hadoop.io.Text org.apache.hadoop.io.IntWritable aaa@qq.com鎀1(緁x7+???
c++-c.txt hello-a.txt hello-b.txt hello-c.txt
jack-b.txt
java-c.txt jerry-b.txt jerry-c.txt
jim-a.txt
jim-b.txt kitty-a.txt kitty-b.txt
rose-a.txt
tom-a.txt
第二步,设定读取文件的格式:注意map任务读取的数据格式不是LongWritable以及Text而是上一步最后输出的文件格式类型Text,IntWritable,即改变文件格式以后,map并不是按行读取数据。
job.setInputFormatClass(SequenceFileInputFormat.class);
public class IndexStepTwo {
public static class IndexStepTwoMapper extends Mapper<Text, IntWritable, Text, Text> {
@Override
protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split("-");
context.write(new Text(split[0]), new Text(split[1].replaceAll("\t", "-->")));
}
}
public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text> {
// 一组数据: <hello,a.txt-->4> <hello,b.txt-->4> <hello,c.txt-->4>
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value.toString()).append("\t");
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(IndexStepTwo.class);
job.setMapperClass(IndexStepTwoMapper.class);
job.setReducerClass(IndexStepTwoReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out3"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop-2.8.1\\data\\index\\out4"));
job.waitForCompletion(true);
}
}
最终结果:
c++ c.txt
hello a.txt b.txt c.txt
jack b.txt
java c.txt
jerry b.txt c.txt
jim a.txt b.txt
kitty a.txt b.txt
rose a.txt
tom a.txt
上一篇: 跨平台应用——Electron