欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

浅谈Hadoop序列化接口Writable及实操

程序员文章站 2022-04-28 18:09:53
...

一 、简介

    如果进行Hadoop代码开发,诸如mapreduce等;

    有时需要在框架内部传递bean对象,这时就需要序列化;我们知道Hadoop是由Java编写的,如果用Java本身的序列化Serializable可不可行呢?

    Hadoop框架本身实现序列化的接口Writable,有哪些优点呢?具体又是如何实现的呢?

    接下来我将自己理解的一一介绍,和大家一起交流!并通过简单的mapreduce开发过程进行实操

二、Java–Serializable

先看下普通Java序列化

1、构造bean对象
package com.lee.writable;

import java.io.Serializable;

public class MyInfo implements Serializable {
    /**显示声名,系统会把当前类声明的serialVersionUID写入到序列化文件中,用于反序列化时
     *系统会去检测文件中的serialVersionUID前后是否一致,
     *隐示不声明,Java会根据class文件自动生成,但是在这个过程中类信息更改,UID会匹配失败
     */
    private static final long serialVersionUID = 15555555555789L;

    String name;

    int age;

    //tel 不想被序列化可以如下(transient);反序列化后tel为null值
    //private transient String tel;
    private String tel;


    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTel() {
        return tel;
    }

    public void setTel(String tel) {
        this.tel = tel;
    }

    @Override
    public String toString() {
        return "MyInfo{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", tel='" + tel + '\'' +
                '}';
    }
}
2、序列化工具类
package com.lee.writable;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class TestJavaSerializableUtils {

    // 保存对象,序列化
    public static void saveObject(Object object) throws Exception {

        ObjectOutputStream out = null;
        FileOutputStream fos = null;

        try {
            fos = new FileOutputStream("hadoop/src/file/testJavaSer.txt");
            out = new ObjectOutputStream(fos);
            out.writeObject(object);
        } finally {
            fos.close();
            out.close();
        }
    }

    // 读取对象,反序列化
    public static Object readObject() throws Exception {
        ObjectInputStream in = null;
        FileInputStream fis = null;
        try {
            fis = new FileInputStream("hadoop/src/file/testJavaSer.txt");
            in = new ObjectInputStream(fis);
            Object object = in.readObject();
            return object;
        } finally {
            fis.close();
            in.close();
        }
    }

}
3、主类测试	
package com.lee.writable;


public class TestJavaSerializable {

    public static void main(String[] args) {

        long begin = System.currentTimeMillis();
		//封装数据
        MyInfo info = new MyInfo();
        info.setAge(18);
        info.setName("lee");
        info.setTel("155****0831");
	
        // 序列化
        try {
            TestJavaSerializableUtils.saveObject(info);
        } catch (Exception e) {
            System.out.println("保存时异常:" + e.getMessage());
        }

        // 反序列化
        MyInfo myInfo;
        try {
            myInfo = (MyInfo) TestJavaSerializableUtils.readObject();
            System.out.println(myInfo);
        } catch (Exception e) {
            System.out.println("读取时异常:" + e.getMessage());
        }


        long end = System.currentTimeMillis();

        System.out.println("耗时:" + (end - begin));
    }


}

浅谈Hadoop序列化接口Writable及实操

jdk序列化的缺点

	1、序列化之后的码流过大

	jdk进行序列化编码之后产生的字节数组过大(额外附加校验、header、继承体系),占用的存储内存空间也较高,这就导致了相应的流在网络
	传输的时候带宽占用较高,性能相比较为低下的情况。


	2、另外,无法跨语言

	这一缺点几乎是致命伤害,对于跨进程的服务调用,通常都需要考虑到不同语言的相互调用时候的兼容性,而这一点对于jdk序列化操作来说却
	无法做到。这是因为jdk序列化操作时是使用了java语言内部的私有协议,在对其他语言进行反序列化的时候会有严重的阻碍。

三、Hadoop–Writable

	由于Java序列化属于重量级且不利于网络传输,而大数据架构的瓶颈主要也集中于磁盘性能和网络IO;另外,自己框架内部的实现,自然更有利
	于解决由于Hadoop版本升级引发的问题;Hadoop框架有一套自己的序列化实现--Writable
	
	特点:
		1、紧凑,体积小,节省带宽;
		2、快速,序列化过程快速;
		3、可扩展性(向下兼容),新API支持旧数据格式;

Wriable实现过程

1 实验数据及处理要求

# 数据格式
# id,注册名,平台,原价,优惠,网络状态码

#期望输出
# 注册名	原价	优惠	最终价格
# hdfs		1000	18		982

1,13568436656,Android,2481,81,200
2,13568436656,Android,1116,16,200
3,kylin,PC,1000,152,200
4,kylin,Android,500,0,404
5,sqoop,IOS,5000,206,200
6,jd_lzs,Android,8000,7999,200
7,yarn,IOS,1116,16,200
8,hello_world,IOS,3156,56,200
9,shihaiyang,PC,240,0,200
10,wangerxin,PC,6960,200,200
11,xurui,Android,3659,300,200
12,luomeng,IOS,1938,38,500
13,luomeng,Android,918,7,200
14,hive,Android,180,180,200
15,dahaige,Android,1938,38,200
16,hbase,Android,3008,8,404
17,hello_world,Android,7335,35,404
18,hujie,Android,9531,1,200
19,hujie,Android,11058,1000,200
20,jd_lzs,Android,120,0,200
21,hdfs,PC,1000,18,200
22,hive,PC,1500,0,200

2 编写mapreduce代码

0、dependencies
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>learning</artifactId>
        <groupId>com.lee</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hadoop</artifactId>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>d:/jdk1.8/lib/tools.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin </artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
1、统计消费情况的bean对象
package com.lee.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * (1)必须实现Writable接口
 * (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
 * (3)重写序列化方法
 *
 * @Override public void write(DataOutput out) throws IOException {
 * }
 * (4)重写反序列化方法
 * @Override public void readFields(DataInput in) throws IOException {
 * }
 * (5)注意反序列化的顺序和序列化的顺序完全一致
 * (6)要想把结果显示在文件中,需要重写toString(),方便后续使用。
 */

/**
 * 支付统计的bean
 * */
public class ShoppingBean implements Writable {
    //测试文件,全都是int
    private int origin_price;
    private int discount;
    private int pay_money;

    //反序列化时反射需要调用空参
    public ShoppingBean() {
        super();
    }

    //有参构造,用于reducer输出 封装数据用
    public ShoppingBean(int para_origin, int para_discount) {
        super();
        this.origin_price = para_origin;
        this.discount = para_discount;
        this.pay_money = para_origin - para_discount;
    }

    //序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(origin_price);
        dataOutput.writeInt(discount);
        dataOutput.writeInt(pay_money);

    }

    //反序列化 注意和序列化顺序一致
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.origin_price = dataInput.readInt();
        this.discount = dataInput.readInt();
        this.pay_money = dataInput.readInt();

    }

    //getter and setter
    // toString()


    public int getOrigin_price() {
        return origin_price;
    }

    public void setOrigin_price(int origin_price) {
        this.origin_price = origin_price;
    }

    public int getDiscount() {
        return discount;
    }

    public void setDiscount(int discount) {
        this.discount = discount;
    }

    public int getPay_money() {
        return pay_money;
    }

    public void setPay_money(int pay_money) {
        this.pay_money = pay_money;
    }

    @Override
    public String toString() {
        return "ShoppingBean{" +
                "origin_price=" + origin_price +
                ", discount=" + discount +
                ", pay_money=" + pay_money +
                '}';
    }
}

2、编写Mapper类
package com.lee.writable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingBean> {

    //map输出,用于封装数据
    Text k = new Text();
    ShoppingBean shoppingBean = new ShoppingBean();

    //(行偏移量,行数据,map输出)
    @Override
    protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
        //读取行转为String
        String line = values.toString();
        //切割
        String[] iterms = line.split(",");
		//网络状态过滤
        if (iterms[iterms.length - 1].equals("404")) {
            return;
        }

        //封装输出的数据
        String username = iterms[1];
        k.set(username);

        int origin_price = Integer.parseInt(iterms[iterms.length - 3]);
        int discount = Integer.parseInt(iterms[iterms.length - 2]);

        shoppingBean.setOrigin_price(origin_price);
        shoppingBean.setDiscount(discount);

        context.write(k, shoppingBean);

    }
}
3、编写Reducer类
package com.lee.writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ShoppingReducer extends Reducer<Text, ShoppingBean, Text, ShoppingBean> {

    @Override
    protected void reduce(Text key, Iterable<ShoppingBean> values, Context context) throws IOException, InterruptedException {

        int sumOrigin = 0;
        int sumDiscount = 0;

        // reduce已经按用户名归类了,原价和优惠 分别累加一起
        for (ShoppingBean value : values) {
            sumOrigin += value.getOrigin_price();
            sumDiscount += value.getDiscount();
        }

        //封装数据
        ShoppingBean bean = new ShoppingBean(sumOrigin, sumDiscount);
        //输出
        context.write(key, bean);
    }
}
4、编写main类(驱动类)
package com.lee.writable;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;


public class ShoppingDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //用于封装 mapreduce各种运行参数
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //jar包
        job.setJarByClass(ShoppingDriver.class);

        //map  reduce 类
        job.setMapperClass(ShoppingMapper.class);
        job.setReducerClass(ShoppingReducer.class);

        //map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ShoppingBean.class);

        //最终输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(ShoppingBean.class);

        //输入和输出路径
        //import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        //import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        // 注意不是import org.apache.hadoop.mapred包下的FileInputFormat/FileOutputFormat
        FileInputFormat.setInputPaths(job, new Path("hadoop/src/file/writable/data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hadoop/src/file/writable_out"));

        // 将job中配置的相关参数  提交给yarn去运行

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5、执行结果,达到预期

浅谈Hadoop序列化接口Writable及实操

相关标签: Hadoop