欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

辅助排序和Mapreduce整体流程

程序员文章站 2022-04-12 09:20:02
一、辅助排序 需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。 思路:1.封装订单类OrderBean,实现WritableComparable接口; 2.自定义Mapp ......

一、辅助排序

  需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。

  思路:1.封装订单类orderbean,实现writablecomparable接口;

     2.自定义mapper类,确定输入输出数据类型,写业务逻辑;

     3.自定义分区,根据不同的订单id返回不同的分区值;

     4.自定义reducer类;

     5.辅助排序类ordergroupingcomparator继承writablecomparator类,并定义无参构成方法、重写compare方法;

     6.书写driver类;

  代码如下:

/**
 * @author: princesshug
 * @date: 2019/3/25, 21:42
 * @blog: https://www.cnblogs.com/hellobigtable/
 */
public class orderbean implements writablecomparable<orderbean> {
    private int orderid;
    private double orderprice;

    public orderbean() {
    }

    public orderbean(int orderid, double orderprice) {
        this.orderid = orderid;
        this.orderprice = orderprice;
    }

    public int getorderid() {
        return orderid;
    }

    public void setorderid(int orderid) {
        this.orderid = orderid;
    }

    public double getorderprice() {
        return orderprice;
    }

    public void setorderprice(double orderprice) {
        this.orderprice = orderprice;
    }

    @override
    public string tostring() {
        return orderid + "\t" + orderprice;
    }

    @override
    public int compareto(orderbean o) {
        int rs ;
        if (this.orderid > o.getorderid()){
            rs = 1;
        }else if (this.orderid < o.getorderid()){
            rs = -1;
        }else {
            rs = (this.orderprice > o.getorderprice()) ? -1:1;
        }
        return rs;
    }

    @override
    public void write(dataoutput out) throws ioexception {
        out.writeint(orderid);
        out.writedouble(orderprice);
    }

    @override
    public void readfields(datainput in) throws ioexception {
        orderid = in.readint();
        orderprice = in.readdouble();
    }
}

public class ordermapper extends mapper<longwritable, text,orderbean, nullwritable> {
    @override
    protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        //获取数据
        string line = value.tostring();

        //切割数据
        string[] fields = line.split("\t");

        //封装数据
        int orderid = integer.parseint(fields[0]);
        double orderprice = double.parsedouble(fields[2]);
        orderbean orderbean = new orderbean(orderid, orderprice);

        //发送数据
        context.write(orderbean,nullwritable.get());
    }
}

public class orderpartitioner extends partitioner<orderbean, nullwritable> {
    @override
    public int getpartition(orderbean orderbean, nullwritable nullwritable, int i) {
        //构造参数中i的值为reducetask的个数
        return (orderbean.getorderid() & integer.max_value ) % i;
    }
}

public class orderreducer extends reducer<orderbean, nullwritable,orderbean,nullwritable> {
    @override
    protected void reduce(orderbean key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {
        context.write(key,nullwritable.get());
    }
}

public class ordergrouptingcomparator extends writablecomparator {
    //必须使用super调用父类的构造方法来定义对比的类为orderbean
    protected ordergrouptingcomparator(){
        super(orderbean.class,true);
    }

    @override
    public int compare(writablecomparable a, writablecomparable b) {
        orderbean abean = (orderbean)a;
        orderbean bbean = (orderbean)b;

        int rs ;
        if (abean.getorderid() > bbean.getorderid()){
            rs = 1;
        }else if (abean.getorderid() < bbean.getorderid()){
            rs = -1;
        }else {
            rs = 0;
        }
        return rs;
    }
}

public class orderdriver {
    public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
        //配置信息,job对象
        configuration conf = new configuration();
        job job = job.getinstance(conf);

        //执行类
        job.setjarbyclass(orderbean.class);

        //设置mapper、reducer类
        job.setmapperclass(ordermapper.class);
        job.setreducerclass(orderreducer.class);

        //设置mapper输出数据类型
        job.setmapoutputkeyclass(orderbean.class);
        job.setmapoutputvalueclass(nullwritable.class);

        //设置reducer输出数据类型
        job.setoutputkeyclass(orderbean.class);
        job.setoutputvalueclass(nullwritable.class);

        //设置辅助排序
        job.setgroupingcomparatorclass(ordergrouptingcomparator.class);

        //设置分区类
        job.setpartitionerclass(orderpartitioner.class);

        //设置reducetask数量
        job.setnumreducetasks(3);

        //设置文件输入输出流
        fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\order\\in"));
        fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\order\\out"));

        //提交任务
        if (job.waitforcompletion(true)){
            system.out.println("运行完成!");
        }else {
            system.out.println("运行失败!");
        }
    }
}

  由于这是敲了很多次的代码,没有加太多注释,请谅解!

 

二、mapreduce整体的流程

  1.有一块200m的文本文件,首先将待处理的数据提交客户端;

  2.客户端会向yarn平台提交切片信息,然后yarn计算出所需要的maptask的数量为2;

  3.程序默认使用fileinputformat的textinputformat方法将文件数据读到maptask;

  4.maptask运行业务逻辑,然后将数据通过inputoutputcontext写入到环形缓冲区;

  5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100m的80%时,发生溢写;

  6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释)

  7.分区排序后的数据块可以选择进行combiner合并,然后写入本地磁盘;

  8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);

  9.reducer一次读取一组数据,然后使用默认的textoutputformat方法将数据写出到结果文件。辅助排序和Mapreduce整体流程