Hadoop系列(4):MapReduce实现倒排索引(Inverted Index)
程序员文章站
2022-04-28 18:10:17
...
1、倒排索引
倒排索引常用于信息检索领域。图1展示了一个布尔检索的过程,即统计每个单词在哪些文档中出现过。当输入blue时,返回带有blue单词的所有文档编号。算法统计单词在哪些文档中出现,而不是统计文档中有哪些单词,是因为与文档数目相比,常用词表的数量小的多。
图 1 布尔倒排索引过程
2、MapReduce 代码
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class MyMap extends Mapper<Object, Text, Text, Text> {
private Text word = new Text();
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
HashMap<String, Integer> dict = new HashMap<>();
//获取文件名
String docid = ((FileSplit) context.getInputSplit()).getPath().getName().split("\\.")[0];
StringTokenizer itr = new StringTokenizer(value.toString());
//in-mapper-combine 合并一个单词在某个文档中的出现次数 <word, <docid, n>>
while (itr.hasMoreTokens()) {
String tmp_key = itr.nextToken();
if (dict.containsKey(tmp_key)) {
int cnt = dict.get(tmp_key);
dict.put(tmp_key, ++cnt);
} else {
dict.put(tmp_key, 1);
}
}
Iterator iter = dict.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String t_key = (String) entry.getKey();
int t_value = (int) entry.getValue();
word.set(t_key);
IntWritable num = new IntWritable(t_value);
context.write(word, new Text(docid+":"+String.valueOf(num)));
}
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
HashMap<String, Integer> dict = new HashMap<>();
for (Text tempText : values) {
dict.clear();
String[] attr = tempText.toString().trim().split("\\:");
String docid = attr[0];
int n = Integer.valueOf(attr[1]);
// 对于某个key,将其在相同文档中的出现次数相加
if(dict.containsKey(docid)){
int cnt = dict.get(docid)+n;
dict.put(docid, cnt);
}else{
dict.put(docid,n);
}
}
StringBuffer result = new StringBuffer();
Iterator iter = dict.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = (Map.Entry) iter.next();
String docid = (String) entry.getKey();
int n = (int) entry.getValue();
result.append(docid+":"+String.valueOf(n)+"\t");
}
context.write(key, new Text(result.toString()));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MinMaxCountDriver <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "* Comment Date Min Max Count");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(MyMap.class);
job.setCombinerClass(MyReduce.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
下一篇: 求类似58同城的市、区、商圈数据?