欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce--倒排索引

程序员文章站 2022-04-28 18:11:29
...

文档倒排算法简介

Inverted Index(倒排索引)是目前几乎所有支持全文检索的搜索引擎都要依赖的一个数据结构。基于索引结构,给出一个词(term),能取得含有这个term的文档列表(the list of documents) 
Web Search中的问题主要分为三部分:

  1. crawling(gathering web content) ,网页爬虫,收集数据
  2. indexing(construction of the inverted index) ,根据大量数据构建倒排索引结构
  3. retrieval(ranking documents given a query),根据一个搜索单词进行索引并对结果进行排序,比如可以根据词频多少来排 
    crawling和indexing都是离线的,retrieval是在线、实时的。 
    此处有个问题,索引结构会如何进行存储呢? 
    给定一个单词,如何快速得到结果呢? 
    一般可以采用两种存储方式,一种是hash链表,还有一种则是B(B+)树。 
    说起存储我就想到我同学面试时被问到一个问题:a~z26个单词如何存储能快速索引? 
    (⊙﹏⊙)b居然是26叉树,好丧心病狂啊!

基本的倒排索引结构

MapReduce--倒排索引

实验任务

请实现课堂上介绍的“带词频属性的文档倒排算法”。 
在统计词语的倒排索引时,除了要输出带词频属性的倒排索引,还请计算每个词语的“平均 
提及次数”(定义见下)并输出。 
“平均提及次数”在这里定义为: 
平均提及次数= 词语在全部文档中出现的频数总和/ 包含该词语的文档数 
假如文档集中有四本小说:A、B、C、D。词语“江湖”在文档A 中出现了100 次,在文档B 
中出现了200 次,在文档C 中出现了300 次,在文档D 中没有出现。则词语“江湖”在该文 
档集中的“平均提及次数”为(100 + 200 + 300) / 3 = 200。

输出格式 
对于每个词语,输出两个键值对,两个键值对的格式如下: 
[词语] \TAB 词语1:词频, 词语2:词频, 词语3:词频, …, 词语100:词频 
[词语] \TAB 平均提及次数 
下图展示了输出文件的一个片段(图中内容仅为格式示例): 
MapReduce--倒排索引

设计

倒排索引可以看做是wordcount的拓展,它需要统计一个单词在多个文件中出现的次数,那么它的Mapper和Reducer该如何设计呢? 
很自然地我们会想到 
Mapper:

  • 对于文件file中任一word,
  • Key = word, Value = fileName + 1.

Reduer:

  • 对于输入Key, Iterable(Text) Values,
  • 统计Values中每个Value,记录出现的fileName以及频数.

这里有个问题,它需要假定对于一个相同的Key,Mapper给出的输出

design trick: value-to-key conversion

Value到Key的转换 
比如说对于原来的(term, (docid, tf))可以将value中的docid放到key中从而得到 
新的键值对((term, docid), tf)。 
这样具有相同key值的键值对数目就降低啦!

关于Mapper里边的代码我遇到两个问题: 
1.用空格 ” “来对line做分词,在最后的输出结果里边会出现空白单词,很奇怪,虽然我在输出的时候加了token为” “或”\t”的时候都不输出,但是最后结果里边还是有空白单词,匪夷所思诶。 
2.Mapper的输出如果采用((term:docid), tf)的形式,使用“:”来分隔term和docid,那么在Combiner里边如果我使用”:”来分隔key(也就是下边错误的Mapper方式),那么得到的String个数有时候长度居然<2,所以我现在使用”->”来进行分隔。

public static class InverseIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

        @Override
        protected void map(LongWritable key, Text value,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String line = value.toString();
            String[] strTokens = line.split(" ");
            FileSplit inputSplit = (FileSplit)context.getInputSplit();
            Path path = inputSplit.getPath();
            String pathStr = path.toString();
            int index = pathStr.lastIndexOf("/");
            String strFileName = pathStr.substring(index + 1); 
            for (String token : strTokens){
                if (token != " " && token != "\t"){
                    context.write(new Text(token + "->" + strFileName), new Text("1"));
                }
            }
        }
    }

Combiner的使用

为了减少Mapper的输出,从而降低Mapper到Reducer的传输开销以及存储开销,使用Combiner是个好方法,相当于是在每个Mapper结束之后先进行一次Reducer将结果汇总一下。 
这里是将相同文档的相同term的词频统计一下。 
我看到过有这样一种处理方法,它为了Reducer方便处理,所以将Mapper的输出从((term, docid), tf)变为((term, (docid, tf))。

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            int freq = 0;
            for (Text value : values){
                freq++;
            }
            context.write(new Text(tokens[0]), new Text(tokens[1] + "->" + freq));
        }
    }

正确的如下

public static class InverseIndexCombiner extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int freq = 0;
            for (Text value : values){
                freq++;
            }
            context.write(key, new Text("" + freq));
        }
    }

Partitioner的设计

因为value-to-key conversion,Mapper的输出中key变为了(term, docid)。如果采用默认的Partitioner,那么具有相同term,不同docid的项很可能会被划分到不同的Reducer,这与初衷是违背的啊,所以需要自定义一个Partitioner,用key中的term作为划分的依据! 
这里有个小问题,如果我采用Combiner中的错误方式,将Combiner的输出重新变化为了(term, (docid, tf)),那么是否还需要自定义Partitioner了呢? 
答案是需要的,看来Partitioner的判断依据不是Combiner的输出啦!

public static class InverseIndexPartitioner extends HashPartitioner<Text, Text>{

        @Override
        public int getPartition(Text key, Text value, int numReduceTasks) {
            // TODO Auto-generated method stub
            String strKey = key.toString();
            String[] tokens = strKey.split("->");
            return super.getPartition(new Text(tokens[0]), value, numReduceTasks);
        }

    }

Reducer的设计

根据Mapper传来的输出( (term, docid), tf),这里需要进行的处理便是将具有相同term的键值对聚集在一起,并重组成( term , (docid1:tf1, docid2:tf2, …) )的输出形式。

  • 采用静态变量strWord来记录上一次reduce过程中的term ;
  • 采用静态变量map记录静态变量strWord对应的docid:tf对;
  • 处理reduce过程时,首先将key分割出term以及fileName,
  • 判断term是否与strWord相等,
  • 如果相等,首先累计额values,得到docid,tf对之后加入map;
  • 否则将strWord,map输出,并清空map,strWord赋值为term,处理当前docid,tf,并加入map;
  • 因为最后一次reduce过程不可能将它自己的数据输出,所以需要重载cleanup函数在里边进行输出
  • 还有一点需要注意,String的相等判断用“==”是不行的哦,如果用了“==”而不是“equals”,会出现什么后果呢?
public static class InverseIndexReducer extends Reducer<Text, Text, Text, Text>{

        static Map<String, Integer> map = new HashMap<String, Integer>();
        static String strWord = null;
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
                String[] tokens = key.toString().split("->");
                if (strWord == null){
                    strWord = tokens[0];
                } 
                if (strWord.equals(tokens[0])){
                    String strFileName = tokens[1];
                    int freq = 0;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }
                    map.put(strFileName, freq);
                } else {
                    String strNewValue = "";
                    double aveFreq = 0;
                    for (Map.Entry<String, Integer> entry : map.entrySet()){
                        strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                        aveFreq += (double)entry.getValue();
                    }
                    aveFreq /= (double)map.size();
                    Text newKey = new Text(strWord);
                    map.clear();
                    context.write(newKey, new Text(strNewValue));
                    context.write(newKey, new Text("" + aveFreq));

                    strWord = tokens[0];
                    String strFileName = tokens[1];
                    int freq = 0;
                    for (Text value : values){
                        freq += Integer.parseInt(value.toString());
                    }   
                    map.put(strFileName, freq);
                }

        }
        @Override
        protected void cleanup(Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            String strNewValue = "";
            double aveFreq = 0;
            for (Map.Entry<String, Integer> entry : map.entrySet()){
                strNewValue += entry.getKey() + ":" + entry.getValue() + ",";
                aveFreq += (double)entry.getValue();
            }
            aveFreq /= (double)map.size();
            Text newKey = new Text(strWord);
            map.clear();
            context.write(newKey, new Text(strNewValue));
            context.write(newKey, new Text("" + aveFreq));

            super.cleanup(context);
        }


    }

main函数

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = new Job(conf, "InverseIndex");
        job.setJarByClass(InverseIndex.class);

        job.setNumReduceTasks(4);

        job.setMapperClass(InverseIndexMapper.class);
        job.setCombinerClass(InverseIndexCombiner.class);
        job.setPartitionerClass(InverseIndexPartitioner.class);
        job.setReducerClass(InverseIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }

运行结果

MapReduce--倒排索引 
MapReduce--倒排索引