欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Mapreduce的序列化和流量统计程序开发

程序员文章站 2022-06-11 08:35:59
一、Hadoop数据序列化的数据类型 Java数据类型 => Hadoop数据类型 int IntWritable float FloatWritable long LongWritable double DoubleWritable String Text boolean BooleanWrita ......

一、hadoop数据序列化的数据类型

  java数据类型 => hadoop数据类型

  int         intwritable

  float        floatwritable

  long        longwritable

  double         doublewritable

  string       text

  boolean      booleanwritable

  byte        bytewritable

  map          mapwritable

  array        arraywritable

二、hadoop的序列化

  1.什么是序列化?

   在java中,序列化接口是serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以hadoop开发了一套自己的序列化框架——writable。

      序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;

   反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。

  2.序列化的理解方法(自己悟的,不对勿喷~~)

    比如下面流量统计案例中,流量的封装类flowbean实现了writable接口,其中定义了变量upflow、dwflow、flowsum;

    在mapper和reducer类中初始化封装类flowbean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类flowbean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;

    当mapper或reducer需要将这些对象的字节序列写出到磁盘时,封装类flowbean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。

  3.序列化特点

   序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。

  特点:1)紧凑;

     2)快速

     3)可扩展

     4)可互操作

三、mapreduce的流量统计程序案例

  1.代码

/**
 * @author: princesshug
 * @date: 2019/3/23, 23:38
 * @blog: https://www.cnblogs.com/hellobigtable/
 */
public class flowbean implements writable {
    private long upflow;
    private long dwflow;
    private long flowsum;

    public long getupflow() {
        return upflow;
    }

    public void setupflow(long upflow) {
        this.upflow = upflow;
    }

    public long getdwflow() {
        return dwflow;
    }

    public void setdwflow(long dwflow) {
        this.dwflow = dwflow;
    }

    public long getflowsum() {
        return flowsum;
    }

    public void setflowsum(long flowsum) {
        this.flowsum = flowsum;
    }

    public flowbean() {
    }

    public flowbean(long upflow, long dwflow) {
        this.upflow = upflow;
        this.dwflow = dwflow;
        this.flowsum = upflow + dwflow;
    }

    /**
     * 序列化
     * @param out 输出流
     * @throws ioexception
     */
    @override
    public void write(dataoutput out) throws ioexception {
        out.writelong(upflow);
        out.writelong(dwflow);
        out.writelong(flowsum);
    }

    /**
     * 反序列化
     * @param in
     * @throws ioexception
     */
    @override
    public void readfields(datainput in) throws ioexception {
        upflow = in.readlong();
        dwflow = in.readlong();
        flowsum = in.readlong();
    }

    @override
    public string tostring() {
        return upflow + "\t" + dwflow + "\t" + flowsum;
    }
}

public class flowcountmapper extends mapper<longwritable, text,text,flowbean> {
    @override
    protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        //获取数据
        string line = value.tostring();

        //切分数据
        string[] fields = line.split("\t");

        //封装数据
        string phonenum = fields[1];
        long upflow = long.parselong(fields[fields.length - 3]);
        long dwflow = long.parselong(fields[fields.length - 2]);

        //发送数据
        context.write(new text(phonenum),new flowbean(upflow,dwflow));
    }
}

public class flowcountreducer extends reducer<text,flowbean,text,flowbean> {
    @override
    protected void reduce(text key, iterable<flowbean> values, context context) throws ioexception, interruptedexception {
        //聚合数据
        long upflow_sum = 0;
        long dwflow_sum = 0;
        for (flowbean f:values){
            upflow_sum += f.getupflow();
            dwflow_sum += f.getdwflow();
        }
        //发送数据
        context.write(key,new flowbean(upflow_sum,dwflow_sum));
    }
}


public class flowpartitioner extends partitioner<text,flowbean> {
    @override
    public int getpartition(text key, flowbean value, int i) {
        //获取用来分区的电话号码前三位
        string phonenum = key.tostring().substring(0, 3);
        //设置分区逻辑
        int partitionnum = 4;
        if ("135".equals(phonenum)){
            return 0;
        }else if ("137".equals(phonenum)){
            return 1;
        }else if ("138".equals(phonenum)){
            return 2;
        }else if ("139".equals(phonenum)){
            return 3;
        }
        return partitionnum;
    }
}
public class flowcountdriver {
    public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
        //获取配置,定义工具
        configuration conf = new configuration();
        job job = job.getinstance();

        //设置运行类
        job.setjarbyclass(flowcountdriver.class);

        //设置mapper类及mapper输出数据类型
        job.setmapperclass(flowcountmapper.class);
        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(flowbean.class);

        //设置reducer类及其输出数据类型
        job.setreducerclass(flowcountreducer.class);
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(flowbean.class);

        //设置自定义分区
        job.setpartitionerclass(flowpartitioner.class);
        job.setnumreducetasks(5);

        //设置文件输入输出流
        fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\flow\\in"));
        fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\flow\\inpartitionout"));

        //返回运行完成
        if (job.waitforcompletion(true)){
            system.out.println("运行完毕!");
        }else {
            system.out.println("运行出错!");
        }
    }
}