欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop之MapReduce应用实例2(分组排序)

程序员文章站 2022-06-30 11:47:24
...

一、数据集及程序要求

  数据集ramen-ratings.txt,包含全世界2580种方便面的品牌、国家/地区、包装类型、评分等内容,使用MapReduce计数并求平均值,输出:每个国家/地区最受欢迎的三大方便面品牌,按评分,去掉重复。

二、源代码编写

2.1 打开IntelliJ IDEA创建Maven项目

2.2 pom.xml 文件如下:

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit -->
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>2.5</version>
            <type>pom</type>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>3.1.0</version>
        </dependency>
    </dependencies>

2.3 编写多值键类

  • 代码逻辑:
    键类型:Ramen
    属性:包含国家/地区 Text 和 评分FloatWritable两个属性
    排序比较器:一级排序按国家/地区升序排列,二级排序按评分降序排列
    代码如下:
public class Ramen implements WritableComparable<Ramen> {

    private FloatWritable star = new FloatWritable();//用于存储评分数据
    private Text nation = new Text();//用于存取国家数据

    public FloatWritable getStar() {
        return star;
    }

    public void setStar(FloatWritable star) {
        this.star = star;
    }

    public Text getNation() {
        return nation;
    }

    public void setNation(Text nation) {
        this.nation = nation;
    }

    //Key排序
    @Override
    public int compareTo(Ramen o) {
        //一次排序:一级排序按国家/地区升序排列
        int cmp = this.getNation().compareTo(o.getNation());
        //如果股票代码相同,则按时间排序
        if(cmp != 0)
            return cmp;
        //二次排序:二级排序按评分降序排列(结果乘-1)
        return -this.getStar().compareTo(o.getStar());
    }

    //写入数据至流
    //用于框架对数据的处理
    //注意读readFields和写write的顺序一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        nation.write(dataOutput);
        star.write(dataOutput);
    }

    //从流中读取数据
    //将框架返回的数据提取出到对应属性中来
    //注意读readFields和写write的顺序一致
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        nation.readFields(dataInput);
        star.readFields(dataInput);
    }
}

2.4 组排序

组排序:RamenGroup
排序比较器:按国家/地区升序排列
代码如下:

public class RamenGroup extends WritableComparator {

    public RamenGroup(){
        super(Ramen.class,true);
    }


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Ramen key1 = (Ramen)a;
        Ramen key2 = (Ramen)b;
        return key1.getNation().compareTo(key2.getNation());

    }
}

2.5 Mapper类

输入:一行数据
处理:使用\t将字符串split成数组,提取国家/地区、品牌和评分,分别作为键和值输出
输出:< Ramen , 品牌>
代码如下:

public class RamenMapper extends Mapper<LongWritable, Text, Ramen, Text>{
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if(line.contains("N/A"))    //忽略空行和有无效数据的行
            return;
        //初始化数据
        String[] strs = line.split("\t");
        //清理无效数据
        if (strs[5].equals("Unrated"))
            return;

        Ramen ramen = new Ramen();
        ramen.setNation(new Text(strs[4]));
        ramen.setStar(new FloatWritable(Float.parseFloat(strs[5].trim())));
        //以< Ramen, 品牌>作为键值对输出
        context.write(ramen, new Text(strs[1]));
        System.out.println("key: " + strs[4] + " value: " + strs[5]);
    }
}

2.6 Reduce类

输入:< Ramen ,[品牌]>
处理:从前往后依次找到3个品牌并输出,遇到相同品牌则忽略,若查询不到3个则退出
输出:<国家/地区名,"品牌, 评分\t品牌2, 评分2\t品牌3, 评分3 ">
Hadoop之MapReduce应用实例2(分组排序)
在这里按照同样的输入,可以打印出来数据看前面的排序是否已做好。
Hadoop之MapReduce应用实例2(分组排序)
Hadoop之MapReduce应用实例2(分组排序)
Hadoop之MapReduce应用实例2(分组排序)
经过打印的日志,发现排序是没有问题的,那么接下来便是最后一个步骤了,从前往后依次找到3个品牌并输出,遇到相同品牌则忽略,若查询不到3个则退出。
在这里我采用的是LinkedHashMap来实现的,因为LinkedHashMap的内部存储是按顺序的,并且我写了个类来继承LinkedHashMap,以达到后面若有相同的键,它的值不会覆盖前面的值,这样就能保证去重并且按照原来的降序效果。
继承类为:

public class MyHashMap extends LinkedHashMap {
    @Override
    public Object put(Object key, Object value) {
        if(!this.containsKey(key))
            return super.put(key,value);
        return null;
    }
    
}

代码如下:

public class RamenReducer extends Reducer<Ramen, Text, Text, Text> {


    @Override
    protected void reduce(Ramen key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int j = 0;
        String mark = "1";
        Map<String,String> map = new MyHashMap();//用于将品牌和评分绑定在一个键值对,并且通过实现重写的HashMap去重
        for (Text value : values){
            System.out.println("INFO[" + mark + "] key:" + key.getNation().toString()  + "       品牌:" + " " + value.toString() + "      value:"+ key.getStar().toString() + " index:" + j);
            //list.add(key.getNation().toString());
            map.put(value.toString(),key.getStar().toString());
            j ++;
        }

        String [] resultValue = new String[6];
        j=0;
        int i = 0;
        //若查询不到3个品牌则退出
        if(map.size() < 3){
            return;
        }
        //对前3个品牌进行赋值
        for(String mapKey:map.keySet()){
            if(j < 3){
                resultValue[i] = mapKey;
                i++;
                resultValue[i] = map.get(mapKey);
                i++;
            }
            j++;
        }
        //以<国家/地区名,"品牌, 评分\t品牌2, 评分2\t品牌3, 评分3 ">格式输出
        context.write(key.getNation(),new Text(resultValue[0] + "," + resultValue[1] + "\t" + resultValue[2] + "," + resultValue[3] + "\t" + resultValue[4] + "," + resultValue[5]));

    }

}

2.7 编写Run实现

public class RamenRun extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        BasicConfigurator.configure();
        int res = ToolRunner.run(new RamenRun(), args);
        System.exit(res);
    }
    public int run(String[] args) throws Exception {
        //System.setProperty("HADOOP_USER_NAME","root");
        ////////////////////////////////////////////////////////////
        // 创建作业,配置作业所需参数
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.137.150:9000");
        // 创建作业
        Job job = Job.getInstance(conf, "RamenRun");

        // 注入作业的主类
        job.setJarByClass(RamenRun.class);

        // 为作业注入Map和Reduce类
        job.setMapperClass(RamenMapper.class);
        job.setGroupingComparatorClass(RamenGroup.class);
        job.setReducerClass(RamenReducer.class);

        // 指定输入类型为:文本格式文件;注入文本输入格式类
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("/ramen-ratings.txt")); //HDFS路径

        // 指定输出格式为:文本格式文件;注入文本输入格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Ramen.class);
        job.setOutputValueClass(Text.class);

        // 指定作业的输出目录
        TextOutputFormat.setOutputPath(job, new Path("/test/out31"));     //HDFS路径

        ////////////////////////////////////////////////////////////
        // 作业的执行流程
        // 执行MapReduce
        boolean res = job.waitForCompletion(true);
        if(res) {
            System.out.println("执行成功");
            return 0;
        }
        else
            return -1;
    }
}

执行结果:
Hadoop之MapReduce应用实例2(分组排序)