欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据私房菜--MapReduce实战案例

程序员文章站 2024-03-22 12:30:10
...

1. MapReduce需求

有三个文件file1、file2、file3,文件中每一行都是一个数字,MapReduce程序读取这三个文件,对三个文件中的数字进行整体升序排序,并输出到一个结果文件中,结果文件中的每一行有两个数字(两个数字之间使用制表符分隔),第一个数字代表排序,第二个数字代表原始数据

2.需求分析

  • 输入:三个文件每一行为一个数字

  • 输出:对三个文件的数字进行全排序,按升序排序,并输出其对应的序号

  • 分析:对三个文件的数字进行全排序,则只用到一个Reduce,那么使用默认值即可。我们需要充分运用MapRudce的排序,即Map端的快速排序,溢写合并过程的归并排序,这时候我们只需要在Map端将数字作为key并指向一个Reduce,即可完成升序的全排序(默认为升序)。另外需要输出一个序号,那么序号的来源可以直接通过索引值获取即可,因为我们的经过shuffle排序之后,就已经得到了一个升序的数列,那么递归循环这个数列就可以得到每一个数字对应的序号

  • Map的K-V设计:需要利用map的排序就要把数字作为key,那么Map输出的<K, V>我们可以设计为<IntWritable, NullWritable>

  • Reduce的K-V设计:默认的numPartitions为1,这正是我们需要的,把所有的Map输出结果集都输入到同一个ReduceTask里面去排序之后,ReduceTask通过for循环输出k的值,并把索引值定义为其序号,然后将<索引值, key>构造为一个Text对象,作为Reduce的输出即可完成需求

3.Mapper设计

package com.lagou.order;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * # _*_ coding: utf-8 _*_
 * # @Time : 2020/7/7 13:00
 * # @Author : Depa
 * # @Version:V 0.1
 **/
public class OrderMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {

    @Override
    protected void map(final LongWritable key, final Text value, final Context context) throws IOException, InterruptedException {

        // 1.读取一行文本转换为Int
        final int num = Integer.parseInt(value.toString());
        // 2.将num作为key的输出,Map输出<num , Null>
        context.write( new IntWritable(num), NullWritable.get());

    }
}
  1. 新建一个OrderMapper对象,继承Mapper;输入为<LongWritable, Text>,输出为<IntWritable, NullWritable>;Mapper的输出按照需求分析可以知道,我们把文件里面的数字放到Map的输出key当中即可,再把map的结果发到一个reduce当中去进去全排序
  2. 之后我们重写map方法,将文本的每一行通过Integer.parseInt方法转换成一个整型数值
  3. 之后我们将num作为key的输出,value赋予一个空值。即context.write( new IntWritable(num), NullWritable.get());

3.Reducer设计

package com.lagou.order;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * # _*_ coding: utf-8 _*_
 * # @Time : 2020/7/7 13:12
 * # @Author : Depa
 * # @Version:V 0.1
 **/
public class OrderReducer extends Reducer<IntWritable, NullWritable, Text, NullWritable> {

    Text k = new Text();
    int i = 0;
    @Override
    protected void reduce(final IntWritable key, final Iterable<NullWritable> values, final Context context) throws IOException, InterruptedException {

        for (final NullWritable value : values) {
            System.out.println("order:"+ i + "\t" + "value: "+key.toString());
            i ++ ;
            String tmp = i + "\t" + key.toString();
            k.set(tmp);

            context.write(k, NullWritable.get());


        }
    }
}

通过map阶段的排序之后,默认的partition分区数就是1,我们在shuffle中完成排序之后,在reduce中通过循环写reduce的输出,即索引值作为序号,key值作为也作为输出。我们设置全局变量Text作为reduce的输出,定义个i 作为序号逐渐递增。

4.DRIVER

package com.lagou.order;

import com.lagou.mr.partition.CustomPartitioner;
import com.lagou.sort.SortDriver;
import com.lagou.sort.SortMapper;
import com.lagou.sort.SortReducer;
import com.lagou.sort.SpeakBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * # _*_ coding: utf-8 _*_
 * # @Time : 2020/7/7 13:33
 * # @Author : Depa
 * # @Version:V 0.1
 **/
public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        /*
        1. 获取配置文件对象,获取job对象实例
        2. 指定程序jar的本地路径
        3. 指定Mapper/Reducer类
        4. 指定Mapper输出的kv数据类型
        5. 指定最终输出的kv数据类型
        6. 指定job处理的原始数据路径
        7. 指定job输出结果路径
        8. 提交作业
         */

//        1. 获取配置文件对象,获取job对象实例
        final Configuration conf = new Configuration();
        final Job job = Job.getInstance(conf, "OrderDriver");

//        2. 指定程序jar的本地路径
        job.setJarByClass(OrderDriver.class);
//        3. 指定Mapper/Reducer类
        job.setMapperClass(OrderMapper.class);
        job.setReducerClass(OrderReducer.class);
//        4. 指定Mapper输出的kv数据类型
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
//        5. 指定最终输出的kv数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

//        6.1 初始化inputpath和outpath,并判断output目录是否存在,存在将其删除
        Path inputPath = new Path("E:/input/");
        Path outputPath = new Path("E:/output/");

        FileSystem fs = FileSystem.get(conf);

        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }

//        6. 指定job输入数据路径
        FileInputFormat.setInputPaths(job, inputPath); //指定读取数据的原始路径
//        7. 指定job输出结果路径
        FileOutputFormat.setOutputPath(job, outputPath); //指定结果数据输出路径
//        8. 提交作业
        final boolean flag = job.waitForCompletion(true);
        //jvm退出:正常退出0,非0值则是错误退出
        System.exit(flag ? 0 : 1);
    }
}

通过driver去驱动MapReduce任务,在driver中我们加入了判断output目录是否存在,如果存在就删除,这样就可以方便我们一直重复运行。

5 数据验证

输入文件:
大数据私房菜--MapReduce实战案例

输出文件:
大数据私房菜--MapReduce实战案例