大数据Hadoop之倒排索引,求平均值
一.题目
倒排索引
数据准备
file_1
Welcome to MapReduce World
file_2
MapReduce is simple
file_3
MapReduce is powerful and simple
file_4
Hello MapReduce and Bye MapReduce
需求
实现输出某个单词在每个文件中出现的次数。输出如下格式:
具体单词 file_1:出现次数, file_2:出现次数, file_3:出现次数, file_4:出现次数
示例:
is file_1:0, file_2:1, file_3:1, file_4:0
计算平均成绩
数据准备
subject1
a|李一|88
a|王二|26
a|张三|99
a|刘四|58
a|陈五|45
a|杨六|66
a|赵七|78
a|黄八|100
a|周九|62
a|吴十|12
sbuject2
b|李一|36
b|王二|66
b|张三|86
b|刘四|56
b|陈五|43
b|杨六|86
b|赵七|99
b|黄八|80
b|周九|70
b|吴十|33
subject3
c|李一|83
c|王二|36
c|张三|92
c|刘四|58
c|陈五|75
c|杨六|66
c|赵七|63
c|黄八|60
c|周九|62
c|吴十|72
数据说明:
三门subject的数据,数据用 | 分割,第一列代表科目,第二列是名字,第三列是成绩。
需求
输出每个人的总分以及平均分。
输出示例:
刘四 a:58 b:56 c:58 Total:172 Avg:75
吴十 a:12 b:72 c:33 Total:117 Avg:39
二.实验代码
1.倒排索引
package Hadoop1;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex {
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text keyInfo = new Text(); // 存储单词和URL组合
private Text valueInfo = new Text(); // 存储词频
private FileSplit split; // 存储Split对象
// 实现map函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 获得<key,value>对所属的FileSplit对象
//把词频作为值,把单词和URI组成key值
split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
int splitIndex = split.getPath().toString().indexOf("file");
keyInfo.set(itr.nextToken() + ":" + split.getPath().toString().substring(splitIndex));
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
public static class Combine extends Reducer<Text, Text, Text, Text> {
private Text info = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 统计词频
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新设置value值由URL和词频组成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新设置key值为单词
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 生成文档列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Inverted Index");
job.setJarByClass(InvertedIndex.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
// 设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reduce输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2.平均值
package Hadoop2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class AverageCore {
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text keyInfo = new Text();//keyInfo 存储单词和URL组合
private Text valueInfo = new Text();//valueInfo 存储词频
private FileSplit split;//split 存储Split对象
/// 实现map函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 获得<key,value>对所属的FileSplit对象
split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String str[] = itr.nextToken().split("\\|");
System.out.println("str1 = "+str[1]+"str[2] = "+str[2]);
keyInfo.set(str[1]);
valueInfo.set(str[0]+":"+str[2]);
context.write(keyInfo, valueInfo);
}
}
}
public static class Combine extends Reducer<Text, Text, Text, Text> {
private Text info = new Text();
//实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
StringBuilder str = new StringBuilder();
for (Text value : values) {
str.append(value.toString());
}
info.set(str.toString());
key.set(key);
context.write(key, info);
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
//实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//统计总分
int sum = 0;
StringBuilder sb = new StringBuilder(key.toString()+" ");
for (Text value : values) {
String str = value.toString();
sum += Integer.parseInt(str.split(":")[1]);
sb.append(str+" ");
}
result.set("Total:"+sum+" Avg:"+sum/3);
// 重新设置key值为单词
key.set(sb.toString());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Inverted Index");
job.setJarByClass(AverageCore.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
// 设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reduce输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
三.实验总结
1.在外部eclipse中写好代码,将hadoop依赖手动导入项目中,新建lib文件夹,复制进去。
2.将完成的项目打包为jar包,共享到虚拟机中,建input/input1文件夹,将数据放入其中,用上面的Linux命令运行,运行结束,会生成output/output1文件夹,结果存储其中。
上一篇: 验证码与base64转换的工具类