欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

倒排索引案例(多job串联)两个MapReduce串联工作

程序员文章站 2022-05-24 15:29:55
...

需求:有大量的文本(文档、网页),需要建立搜索索引

输入 输出

倒排索引案例(多job串联)两个MapReduce串联工作

分析:

分两次MapReduce工作,第一次预期输出

atguigu--a.txt	3
atguigu--b.txt	2
atguigu--c.txt	2
pingping--a.txt	 1
pingping--b.txt	3
pingping--c.txt	 1
ss--a.txt	2
ss--b.txt	1
ss--c.txt	1

第二次预期输出

atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping	c.txt-->1	b.txt-->3	a.txt-->1	
ss	c.txt-->1	b.txt-->1	a.txt-->2	

1)第一次处理

(1)第一次处理,编写OneIndexMapper

package com.lzz.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	String name;
	Text k=new Text();
	IntWritable v=new IntWritable(1);
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		//获取文件名
		FileSplit split=(FileSplit)context.getInputSplit();
		name=split.getPath().getName();
	}
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		//0行号(key) atguigu pingping(value)获取一行
		String line=value.toString();
		//切割
		String words[]=line.split(" ");
		for (String word : words) {
			//3拼接
			k.set(word+"--"+name);
			//4输出 atguigu--a.txt(key) 1(value)
			context.write(k, v);
		}
	}
}

(2)第一次处理,编写OneIndexReducer

package com.lzz.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneIndexReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		//1汇总
		//atguigu--a.txt(key) 1(value)
		//atguigu--a.txt(key) 1(value)
		//atguigu--a.txt(key) 1(value)
		int sum=0;
		for (IntWritable value : values) {
			sum+=value.get();
		}
		//2写出
		//atguigu--a.txt(key) 3(value)
		context.write(key, new IntWritable(sum));
	}
}

(3)第一次处理,编写OneIndexDriver

package com.lzz.mapreduce.index;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.amazonaws.services.elasticmapreduce.util.ResizeJobFlowStep.OnArrested;
import com.lzz.mapreduce.WordcountDriver;
import com.lzz.mapreduce.WordcountMapper;
import com.lzz.mapreduce.WordcountReducer;

public class OneIndexDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		args=new String[] {"g:/input/index","g:/output"}; 
		
		Configuration conf=new Configuration();
		//1获取job信息
		Job job=Job.getInstance(conf);
		
		//2获取jar包信息
		job.setJarByClass(OneIndexDriver.class);
		
		//3关联自定义的mapper和reducer
		job.setMapperClass(OneIndexMapper.class);
		job.setReducerClass(OneIndexReduce.class);
		
		//4设置map输出数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//5设置最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//job.setCombinerClass(WordcountCombiner.class);
		job.setCombinerClass(WordcountReducer.class);
		
		//6设置数据输入和输出文件路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//7提交代码
		boolean result=job.waitForCompletion(true);
		System.exit(result?0:1);
	}
}

(4)查看第一次输出结果

倒排索引案例(多job串联)两个MapReduce串联工作

2)第二次处理(将第一次的输出结果作为第二次的输入)

(1)第二次处理,编写TwoIndexMapper

package com.lzz.mapreduce.twoindex;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//注意不是和上一次输出结果的Text, IntWritable,而是重新MapReduce的Reader()开始,从LongWritable, Text开始
public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
	Text k=new Text();
	Text v=new Text();
	//atguigu--a.txt	3
	//atguigu--b.txt	2
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line=value.toString();
		String[] words=line.split("--");
	
		k.set(words[0]);	//atguigu 
		v.set(words[1]);	//a.txt 3
		context.write(k, v);
		
	}
}

(2)第二次处理,编写TwoIndexReducer

package com.lzz.mapreduce.twoindex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text>{
	Text v=new Text();
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		//输入
		//atguigu a.txt	3
		//atguigu b.txt	2
		//atguigu c.txt	2
		
		//1拼接
		StringBuilder sBuilder=new StringBuilder();
		for (Text value : values) {
			sBuilder.append(value.toString().replace("\t", "-->")+"\t");
			
			//c.txt-->2	b.txt-->2	a.txt-->3
		}
		v.set(sBuilder.toString());
		//输出
		context.write(key,v);
	}
}

(3)第二次处理,编写TwoIndexDriver

package com.lzz.mapreduce.twoindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.amazonaws.services.elasticmapreduce.util.ResizeJobFlowStep.OnArrested;
import com.lzz.mapreduce.WordcountDriver;
import com.lzz.mapreduce.WordcountMapper;
import com.lzz.mapreduce.WordcountReducer;

public class TwoIndexDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		args=new String[] {"g:/output","g:/output1"}; 
		
		Configuration conf=new Configuration();
		//1获取job信息
		Job job=Job.getInstance(conf);
		
		//2获取jar包信息
		job.setJarByClass(TwoIndexDriver.class);
		
		//3关联自定义的mapper和reducer
		job.setMapperClass(TwoIndexMapper.class);
		job.setReducerClass(TwoIndexReducer.class);
		
		//4设置map输出数据类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		//5设置最终输出数据类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//6设置数据输入和输出文件路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//7提交代码
		boolean result=job.waitForCompletion(true);
		System.exit(result?0:1);
	}
}

运行结果

倒排索引案例(多job串联)两个MapReduce串联工作

相关标签: hadoop mapreduce