用hadoop实现倒排索引简单实例
用hadoop实现倒排索引简单实例
倒排索引是文档搜索系统中常用的数据结构,即根据内容进行文档的搜索,本次我们利用mapReduce来分析和统计单词在每个文档中的权重,输入2个单词的txt文本,经过mapreduce的处理,将结果以{单词 1.txt:权重,2.txt:权重}逐行输出到output目录下。切记单个文件不要超过HDFS单个块的大小,保证一个文件一个split.否则会出现词频统计不完全。也可以重新实现一个inputFomart类。将每一个文件划分成一个split。
下面简单的讲解下这个程序,InvertedMapper类继承自Mapper类,重写了它的map方法,也可以实现Mapper接口,这个我们以后再说。map方法以{key:value}={单词:文档,1}的形式存于输出到context中。在InvertedCombine中把中间结果进行合并,减少网络传输,加快执行速度。
这里combine还完成了一个操作,就是将key,value的形式,变为了{单词,文档:权重},由于这里采用的是mapreduce默认的Hashpartitioner处理的,因此也保证了单词名相同的记录会被分配到同一个reducer中进行处理。后期可以重新指定partitionerClass,即使key不同,也可同样合并到一个reduce中处理。
具体代码如下
“`package com.ly.study.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedMapperReducer {
public static class InvertedMapper extends Mapper<Object, Text,Text, Text> {
private Text word = new Text();
private Text mapOutKey = new Text();//Text 文本类型 相当于java类型中的String
private Text one = new Text("1"); // IntWritable 类型相当于java类型中的Int类型
//以上2中类型都是mapperReducer中常用的数据类型,他们都实现了序列化接口
//除了下面的的基本类,hadoop还提供基本的自定是数据类型,这个我们以后再说
/*BooleanWritable:标准布尔型
ByteWritable:单字节
IntWritable:整型数
LongWritable:长整型数
DoubleWritable:双字节
FloatWritable:浮点数
Text:用UTF8格式的文本类型
NullWritable:当<key, value>中的key或value为空时使用*/
private FileSplit fs;//文件分割类,实现了抽象类inputSplit,此处使用它可以获取文件的文件路径等信息。
@Override
protected void map(Object key, Text value, Context context)//context mapperReducer中的上下文提供了大量关入该job的相关信息和操作
throws IOException, InterruptedException {
fs = (FileSplit)context.getInputSplit();//将input
String docName = fs.getPath().getName();//获取当前的文件名。
StringTokenizer itTokenizer = new StringTokenizer(value.toString());
while(itTokenizer.hasMoreTokens()){
mapOutKey.set(itTokenizer.nextToken());
word.set(mapOutKey+":"+docName);
context.write(word, one);
}
}
}
public static class InvertedCombiner extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
private Text combinerOutKey = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
int sum =0;
for(Text val:values){
sum += Integer.parseInt(val.toString());
}
String[] strs = key.toString().split(":");
combinerOutKey.set(strs[0]);
result.set(strs[1] + ":" + sum);
context.write(combinerOutKey, result);
}
}
public static class InvertedReducer extends Reducer<Text, Text, Text, Text>{
private Text result = new Text();
private Text ReducerOutKey = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for(Text val:values){
sb.append(val.toString());
sb.append(",");
}
String temp = sb.toString().substring(0,sb.toString().lastIndexOf(","));
ReducerOutKey.set(temp);
context.write(key,ReducerOutKey);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
Job job = new Job(conf, "invertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedCombiner.class);
job.setReducerClass(InvertedReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("yes");
}
}
结果如下:
下一篇: Hadoop的序列化与反序列化实操