倒排索引案例(多job串联)两个MapReduce串联工作
程序员文章站
2022-05-24 15:29:55
...
需求:有大量的文本(文档、网页),需要建立搜索索引
输入 输出
分析:
分两次MapReduce工作,第一次预期输出
atguigu--a.txt 3
atguigu--b.txt 2
atguigu--c.txt 2
pingping--a.txt 1
pingping--b.txt 3
pingping--c.txt 1
ss--a.txt 2
ss--b.txt 1
ss--c.txt 1
第二次预期输出
atguigu c.txt-->2 b.txt-->2 a.txt-->3
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2
1)第一次处理
(1)第一次处理,编写OneIndexMapper
package com.lzz.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
String name;
Text k=new Text();
IntWritable v=new IntWritable(1);
@Override
protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//获取文件名
FileSplit split=(FileSplit)context.getInputSplit();
name=split.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//0行号(key) atguigu pingping(value)获取一行
String line=value.toString();
//切割
String words[]=line.split(" ");
for (String word : words) {
//3拼接
k.set(word+"--"+name);
//4输出 atguigu--a.txt(key) 1(value)
context.write(k, v);
}
}
}
(2)第一次处理,编写OneIndexReducer
package com.lzz.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class OneIndexReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//1汇总
//atguigu--a.txt(key) 1(value)
//atguigu--a.txt(key) 1(value)
//atguigu--a.txt(key) 1(value)
int sum=0;
for (IntWritable value : values) {
sum+=value.get();
}
//2写出
//atguigu--a.txt(key) 3(value)
context.write(key, new IntWritable(sum));
}
}
(3)第一次处理,编写OneIndexDriver
package com.lzz.mapreduce.index;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.amazonaws.services.elasticmapreduce.util.ResizeJobFlowStep.OnArrested;
import com.lzz.mapreduce.WordcountDriver;
import com.lzz.mapreduce.WordcountMapper;
import com.lzz.mapreduce.WordcountReducer;
public class OneIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args=new String[] {"g:/input/index","g:/output"};
Configuration conf=new Configuration();
//1获取job信息
Job job=Job.getInstance(conf);
//2获取jar包信息
job.setJarByClass(OneIndexDriver.class);
//3关联自定义的mapper和reducer
job.setMapperClass(OneIndexMapper.class);
job.setReducerClass(OneIndexReduce.class);
//4设置map输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//job.setCombinerClass(WordcountCombiner.class);
job.setCombinerClass(WordcountReducer.class);
//6设置数据输入和输出文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7提交代码
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
(4)查看第一次输出结果
2)第二次处理(将第一次的输出结果作为第二次的输入)
(1)第二次处理,编写TwoIndexMapper
package com.lzz.mapreduce.twoindex;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//注意不是和上一次输出结果的Text, IntWritable,而是重新MapReduce的Reader()开始,从LongWritable, Text开始
public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k=new Text();
Text v=new Text();
//atguigu--a.txt 3
//atguigu--b.txt 2
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] words=line.split("--");
k.set(words[0]); //atguigu
v.set(words[1]); //a.txt 3
context.write(k, v);
}
}
(2)第二次处理,编写TwoIndexReducer
package com.lzz.mapreduce.twoindex;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoIndexReducer extends Reducer<Text, Text, Text, Text>{
Text v=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//输入
//atguigu a.txt 3
//atguigu b.txt 2
//atguigu c.txt 2
//1拼接
StringBuilder sBuilder=new StringBuilder();
for (Text value : values) {
sBuilder.append(value.toString().replace("\t", "-->")+"\t");
//c.txt-->2 b.txt-->2 a.txt-->3
}
v.set(sBuilder.toString());
//输出
context.write(key,v);
}
}
(3)第二次处理,编写TwoIndexDriver
package com.lzz.mapreduce.twoindex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.amazonaws.services.elasticmapreduce.util.ResizeJobFlowStep.OnArrested;
import com.lzz.mapreduce.WordcountDriver;
import com.lzz.mapreduce.WordcountMapper;
import com.lzz.mapreduce.WordcountReducer;
public class TwoIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args=new String[] {"g:/output","g:/output1"};
Configuration conf=new Configuration();
//1获取job信息
Job job=Job.getInstance(conf);
//2获取jar包信息
job.setJarByClass(TwoIndexDriver.class);
//3关联自定义的mapper和reducer
job.setMapperClass(TwoIndexMapper.class);
job.setReducerClass(TwoIndexReducer.class);
//4设置map输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//5设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//6设置数据输入和输出文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7提交代码
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
运行结果