欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop系列(4):MapReduce实现倒排索引(Inverted Index)

程序员文章站 2022-04-28 18:10:17
...

1、倒排索引

    倒排索引常用于信息检索领域。图1展示了一个布尔检索的过程,即统计每个单词在哪些文档中出现过。当输入blue时,返回带有blue单词的所有文档编号。算法统计单词在哪些文档中出现,而不是统计文档中有哪些单词,是因为与文档数目相比,常用词表的数量小的多。

                      Hadoop系列(4):MapReduce实现倒排索引(Inverted Index)

                                                 图 1 布尔倒排索引过程

2、MapReduce 代码

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {

    public static class MyMap extends Mapper<Object, Text, Text, Text> {

        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            HashMap<String, Integer> dict = new HashMap<>();
            //获取文件名
            String docid = ((FileSplit) context.getInputSplit()).getPath().getName().split("\\.")[0];

            StringTokenizer itr = new StringTokenizer(value.toString());
            //in-mapper-combine 合并一个单词在某个文档中的出现次数 <word, <docid, n>>
            while (itr.hasMoreTokens()) {
                String tmp_key = itr.nextToken();
                if (dict.containsKey(tmp_key)) {
                    int cnt = dict.get(tmp_key);
                    dict.put(tmp_key, ++cnt);
                } else {
                    dict.put(tmp_key, 1);
                }

            }

            Iterator iter = dict.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry) iter.next();
                String t_key = (String) entry.getKey();
                int t_value = (int) entry.getValue();
                word.set(t_key);
                IntWritable num = new IntWritable(t_value);
                context.write(word, new Text(docid+":"+String.valueOf(num)));
            }


        }
    }

    public static class MyReduce extends Reducer<Text, Text, Text, Text> {


        @Override
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            HashMap<String, Integer> dict = new HashMap<>();

            for (Text tempText : values) {
                dict.clear();
                String[] attr = tempText.toString().trim().split("\\:");
                String docid = attr[0];
                int n = Integer.valueOf(attr[1]);
                // 对于某个key,将其在相同文档中的出现次数相加
                if(dict.containsKey(docid)){
                    int cnt = dict.get(docid)+n;
                    dict.put(docid, cnt);
                }else{
                    dict.put(docid,n);
                }

            }

            StringBuffer result = new StringBuffer();
            Iterator iter = dict.entrySet().iterator();

            while (iter.hasNext()) {
                Map.Entry entry = (Map.Entry) iter.next();
                String docid = (String) entry.getKey();
                int n = (int) entry.getValue();
                result.append(docid+":"+String.valueOf(n)+"\t");
            }

            context.write(key, new Text(result.toString()));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: MinMaxCountDriver <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "* Comment Date Min Max Count");
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(MyMap.class);
        job.setCombinerClass(MyReduce.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}