Hadoop之MapReduce应用实例2(分组排序)
程序员文章站
2022-06-30 11:47:24
...
一、数据集及程序要求
数据集ramen-ratings.txt,包含全世界2580种方便面的品牌、国家/地区、包装类型、评分等内容,使用MapReduce计数并求平均值,输出:每个国家/地区最受欢迎的三大方便面品牌,按评分,去掉重复。
- 所有国家方便面的平均分(即哪国的方便面最好吃)
数据获取地址:https://github.com/ordinaryload/Hadoop-tools
二、源代码编写
2.1 打开IntelliJ IDEA创建Maven项目
2.2 pom.xml 文件如下:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit -->
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j</artifactId>
<version>2.5</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
2.3 编写多值键类
- 代码逻辑:
键类型:Ramen
属性:包含国家/地区 Text 和 评分FloatWritable两个属性
排序比较器:一级排序按国家/地区升序排列,二级排序按评分降序排列
代码如下:
public class Ramen implements WritableComparable<Ramen> {
private FloatWritable star = new FloatWritable();//用于存储评分数据
private Text nation = new Text();//用于存取国家数据
public FloatWritable getStar() {
return star;
}
public void setStar(FloatWritable star) {
this.star = star;
}
public Text getNation() {
return nation;
}
public void setNation(Text nation) {
this.nation = nation;
}
//Key排序
@Override
public int compareTo(Ramen o) {
//一次排序:一级排序按国家/地区升序排列
int cmp = this.getNation().compareTo(o.getNation());
//如果股票代码相同,则按时间排序
if(cmp != 0)
return cmp;
//二次排序:二级排序按评分降序排列(结果乘-1)
return -this.getStar().compareTo(o.getStar());
}
//写入数据至流
//用于框架对数据的处理
//注意读readFields和写write的顺序一致
@Override
public void write(DataOutput dataOutput) throws IOException {
nation.write(dataOutput);
star.write(dataOutput);
}
//从流中读取数据
//将框架返回的数据提取出到对应属性中来
//注意读readFields和写write的顺序一致
@Override
public void readFields(DataInput dataInput) throws IOException {
nation.readFields(dataInput);
star.readFields(dataInput);
}
}
2.4 组排序
组排序:RamenGroup
排序比较器:按国家/地区升序排列
代码如下:
public class RamenGroup extends WritableComparator {
public RamenGroup(){
super(Ramen.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Ramen key1 = (Ramen)a;
Ramen key2 = (Ramen)b;
return key1.getNation().compareTo(key2.getNation());
}
}
2.5 Mapper类
输入:一行数据
处理:使用\t将字符串split成数组,提取国家/地区、品牌和评分,分别作为键和值输出
输出:< Ramen , 品牌>
代码如下:
public class RamenMapper extends Mapper<LongWritable, Text, Ramen, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
if(line.contains("N/A")) //忽略空行和有无效数据的行
return;
//初始化数据
String[] strs = line.split("\t");
//清理无效数据
if (strs[5].equals("Unrated"))
return;
Ramen ramen = new Ramen();
ramen.setNation(new Text(strs[4]));
ramen.setStar(new FloatWritable(Float.parseFloat(strs[5].trim())));
//以< Ramen, 品牌>作为键值对输出
context.write(ramen, new Text(strs[1]));
System.out.println("key: " + strs[4] + " value: " + strs[5]);
}
}
2.6 Reduce类
输入:< Ramen ,[品牌]>
处理:从前往后依次找到3个品牌并输出,遇到相同品牌则忽略,若查询不到3个则退出
输出:<国家/地区名,"品牌, 评分\t品牌2, 评分2\t品牌3, 评分3 ">
在这里按照同样的输入,可以打印出来数据看前面的排序是否已做好。
经过打印的日志,发现排序是没有问题的,那么接下来便是最后一个步骤了,从前往后依次找到3个品牌并输出,遇到相同品牌则忽略,若查询不到3个则退出。
在这里我采用的是LinkedHashMap来实现的,因为LinkedHashMap的内部存储是按顺序的,并且我写了个类来继承LinkedHashMap,以达到后面若有相同的键,它的值不会覆盖前面的值,这样就能保证去重并且按照原来的降序效果。
继承类为:
public class MyHashMap extends LinkedHashMap {
@Override
public Object put(Object key, Object value) {
if(!this.containsKey(key))
return super.put(key,value);
return null;
}
}
代码如下:
public class RamenReducer extends Reducer<Ramen, Text, Text, Text> {
@Override
protected void reduce(Ramen key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int j = 0;
String mark = "1";
Map<String,String> map = new MyHashMap();//用于将品牌和评分绑定在一个键值对,并且通过实现重写的HashMap去重
for (Text value : values){
System.out.println("INFO[" + mark + "] key:" + key.getNation().toString() + " 品牌:" + " " + value.toString() + " value:"+ key.getStar().toString() + " index:" + j);
//list.add(key.getNation().toString());
map.put(value.toString(),key.getStar().toString());
j ++;
}
String [] resultValue = new String[6];
j=0;
int i = 0;
//若查询不到3个品牌则退出
if(map.size() < 3){
return;
}
//对前3个品牌进行赋值
for(String mapKey:map.keySet()){
if(j < 3){
resultValue[i] = mapKey;
i++;
resultValue[i] = map.get(mapKey);
i++;
}
j++;
}
//以<国家/地区名,"品牌, 评分\t品牌2, 评分2\t品牌3, 评分3 ">格式输出
context.write(key.getNation(),new Text(resultValue[0] + "," + resultValue[1] + "\t" + resultValue[2] + "," + resultValue[3] + "\t" + resultValue[4] + "," + resultValue[5]));
}
}
2.7 编写Run实现
public class RamenRun extends Configured implements Tool {
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
int res = ToolRunner.run(new RamenRun(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
//System.setProperty("HADOOP_USER_NAME","root");
////////////////////////////////////////////////////////////
// 创建作业,配置作业所需参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.137.150:9000");
// 创建作业
Job job = Job.getInstance(conf, "RamenRun");
// 注入作业的主类
job.setJarByClass(RamenRun.class);
// 为作业注入Map和Reduce类
job.setMapperClass(RamenMapper.class);
job.setGroupingComparatorClass(RamenGroup.class);
job.setReducerClass(RamenReducer.class);
// 指定输入类型为:文本格式文件;注入文本输入格式类
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("/ramen-ratings.txt")); //HDFS路径
// 指定输出格式为:文本格式文件;注入文本输入格式类
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Ramen.class);
job.setOutputValueClass(Text.class);
// 指定作业的输出目录
TextOutputFormat.setOutputPath(job, new Path("/test/out31")); //HDFS路径
////////////////////////////////////////////////////////////
// 作业的执行流程
// 执行MapReduce
boolean res = job.waitForCompletion(true);
if(res) {
System.out.println("执行成功");
return 0;
}
else
return -1;
}
}
执行结果:
上一篇: MapReduce原理详解