MapReduce--倒排索引
文档倒排算法简介
Inverted Index(倒排索引)是目前几乎所有支持全文检索的搜索引擎都要依赖的一个数据结构。基于索引结构,给出一个词(term),能取得含有这个term的文档列表(the list of documents)
Web Search中的问题主要分为三部分:
- crawling(gathering web content) ,网页爬虫,收集数据
- indexing(construction of the inverted index) ,根据大量数据构建倒排索引结构
- retrieval(ranking documents given a query),根据一个搜索单词进行索引并对结果进行排序,比如可以根据词频多少来排
crawling和indexing都是离线的,retrieval是在线、实时的。
此处有个问题,索引结构会如何进行存储呢?
给定一个单词,如何快速得到结果呢?
一般可以采用两种存储方式,一种是hash链表,还有一种则是B(B+)树。
说起存储我就想到我同学面试时被问到一个问题:a~z26个单词如何存储能快速索引?
(⊙﹏⊙)b居然是26叉树,好丧心病狂啊!
基本的倒排索引结构
实验任务
请实现课堂上介绍的“带词频属性的文档倒排算法”。
在统计词语的倒排索引时,除了要输出带词频属性的倒排索引,还请计算每个词语的“平均
提及次数”(定义见下)并输出。
“平均提及次数”在这里定义为:
平均提及次数= 词语在全部文档中出现的频数总和/ 包含该词语的文档数
假如文档集中有四本小说:A、B、C、D。词语“江湖”在文档A 中出现了100 次,在文档B
中出现了200 次,在文档C 中出现了300 次,在文档D 中没有出现。则词语“江湖”在该文
档集中的“平均提及次数”为(100 + 200 + 300) / 3 = 200。
输出格式
对于每个词语,输出两个键值对,两个键值对的格式如下:
[词语] \TAB 词语1:词频, 词语2:词频, 词语3:词频, …, 词语100:词频
[词语] \TAB 平均提及次数
下图展示了输出文件的一个片段(图中内容仅为格式示例):
设计
倒排索引可以看做是wordcount的拓展,它需要统计一个单词在多个文件中出现的次数,那么它的Mapper和Reducer该如何设计呢?
很自然地我们会想到
Mapper:
- 对于文件file中任一word,
- Key = word, Value = fileName + 1.
Reduer:
- 对于输入Key, Iterable(Text) Values,
- 统计Values中每个Value,记录出现的fileName以及频数.
这里有个问题,它需要假定对于一个相同的Key,Mapper给出的输出
design trick: value-to-key conversion
Value到Key的转换
比如说对于原来的(term, (docid, tf))可以将value中的docid放到key中从而得到
新的键值对((term, docid), tf)。
这样具有相同key值的键值对数目就降低啦!
关于Mapper里边的代码我遇到两个问题:
1.用空格 ” “来对line做分词,在最后的输出结果里边会出现空白单词,很奇怪,虽然我在输出的时候加了token为” “或”\t”的时候都不输出,但是最后结果里边还是有空白单词,匪夷所思诶。
2.Mapper的输出如果采用((term:docid), tf)的形式,使用“:”来分隔term和docid,那么在Combiner里边如果我使用”:”来分隔key(也就是下边错误的Mapper方式),那么得到的String个数有时候长度居然<2,所以我现在使用”->”来进行分隔。
public static class InverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
String[] strTokens = line.split(" ");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String pathStr = path.toString();
int index = pathStr.lastIndexOf("/");
String strFileName = pathStr.substring(index + 1);
for (String token : strTokens){
if (token != " " && token != "\t"){
context.write(new Text(token + "->" + strFileName), new Text("1"));
}
}
}
}
Combiner的使用
为了减少Mapper的输出,从而降低Mapper到Reducer的传输开销以及存储开销,使用Combiner是个好方法,相当于是在每个Mapper结束之后先进行一次Reducer将结果汇总一下。
这里是将相同文档的相同term的词频统计一下。
我看到过有这样一种处理方法,它为了Reducer方便处理,所以将Mapper的输出从((term, docid), tf)变为((term, (docid, tf))。
public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String strKey = key.toString();
String[] tokens = strKey.split("->");
int freq = 0;
for (Text value : values){
freq++;
}
context.write(new Text(tokens[0]), new Text(tokens[1] + "->" + freq));
}
}
正确的如下
public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int freq = 0;
for (Text value : values){
freq++;
}
context.write(key, new Text("" + freq));
}
}
Partitioner的设计
因为value-to-key conversion,Mapper的输出中key变为了(term, docid)。如果采用默认的Partitioner,那么具有相同term,不同docid的项很可能会被划分到不同的Reducer,这与初衷是违背的啊,所以需要自定义一个Partitioner,用key中的term作为划分的依据!
这里有个小问题,如果我采用Combiner中的错误方式,将Combiner的输出重新变化为了(term, (docid, tf)),那么是否还需要自定义Partitioner了呢?
答案是需要的,看来Partitioner的判断依据不是Combiner的输出啦!
public static class InverseIndexPartitioner extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
// TODO Auto-generated method stub
String strKey = key.toString();
String[] tokens = strKey.split("->");
return super.getPartition(new Text(tokens[0]), value, numReduceTasks);
}
}
Reducer的设计
根据Mapper传来的输出( (term, docid), tf),这里需要进行的处理便是将具有相同term的键值对聚集在一起,并重组成( term , (docid1:tf1, docid2:tf2, …) )的输出形式。
- 采用静态变量strWord来记录上一次reduce过程中的term ;
- 采用静态变量map记录静态变量strWord对应的docid:tf对;
- 处理reduce过程时,首先将key分割出term以及fileName,
- 判断term是否与strWord相等,
- 如果相等,首先累计额values,得到docid,tf对之后加入map;
- 否则将strWord,map输出,并清空map,strWord赋值为term,处理当前docid,tf,并加入map;
- 因为最后一次reduce过程不可能将它自己的数据输出,所以需要重载cleanup函数在里边进行输出
- 还有一点需要注意,String的相等判断用“==”是不行的哦,如果用了“==”而不是“equals”,会出现什么后果呢?
public static class InverseIndexReducer extends Reducer<Text, Text, Text, Text>{
static Map<String, Integer> map = new HashMap<String, Integer>();
static String strWord = null;
@Override
protected void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens = key.toString().split("->");
if (strWord == null){
strWord = tokens[0];
}
if (strWord.equals(tokens[0])){
String strFileName = tokens[1];
int freq = 0;
for (Text value : values){
freq += Integer.parseInt(value.toString());
}
map.put(strFileName, freq);
} else {
String strNewValue = "";
double aveFreq = 0;
for (Map.Entry<String, Integer> entry : map.entrySet()){
strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
aveFreq += (double)entry.getValue();
}
aveFreq /= (double)map.size();
Text newKey = new Text(strWord);
map.clear();
context.write(newKey, new Text(strNewValue));
context.write(newKey, new Text("" + aveFreq));
strWord = tokens[0];
String strFileName = tokens[1];
int freq = 0;
for (Text value : values){
freq += Integer.parseInt(value.toString());
}
map.put(strFileName, freq);
}
}
@Override
protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String strNewValue = "";
double aveFreq = 0;
for (Map.Entry<String, Integer> entry : map.entrySet()){
strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
aveFreq += (double)entry.getValue();
}
aveFreq /= (double)map.size();
Text newKey = new Text(strWord);
map.clear();
context.write(newKey, new Text(strNewValue));
context.write(newKey, new Text("" + aveFreq));
super.cleanup(context);
}
}
main函数
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = new Job(conf, "InverseIndex");
job.setJarByClass(InverseIndex.class);
job.setNumReduceTasks(4);
job.setMapperClass(InverseIndexMapper.class);
job.setCombinerClass(InverseIndexCombiner.class);
job.setPartitionerClass(InverseIndexPartitioner.class);
job.setReducerClass(InverseIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
运行结果
下一篇: 记一次OOM的排查