浅谈Hadoop序列化接口Writable及实操
程序员文章站
2022-04-28 18:09:53
...
一 、简介
如果进行Hadoop代码开发,诸如mapreduce等;
有时需要在框架内部传递bean对象,这时就需要序列化;我们知道Hadoop是由Java编写的,如果用Java本身的序列化Serializable可不可行呢?
Hadoop框架本身实现序列化的接口Writable,有哪些优点呢?具体又是如何实现的呢?
接下来我将自己理解的一一介绍,和大家一起交流!并通过简单的mapreduce开发过程进行实操
二、Java–Serializable
先看下普通Java序列化
1、构造bean对象
package com.lee.writable;
import java.io.Serializable;
public class MyInfo implements Serializable {
/**显示声名,系统会把当前类声明的serialVersionUID写入到序列化文件中,用于反序列化时
*系统会去检测文件中的serialVersionUID前后是否一致,
*隐示不声明,Java会根据class文件自动生成,但是在这个过程中类信息更改,UID会匹配失败
*/
private static final long serialVersionUID = 15555555555789L;
String name;
int age;
//tel 不想被序列化可以如下(transient);反序列化后tel为null值
//private transient String tel;
private String tel;
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTel() {
return tel;
}
public void setTel(String tel) {
this.tel = tel;
}
@Override
public String toString() {
return "MyInfo{" +
"name='" + name + '\'' +
", age=" + age +
", tel='" + tel + '\'' +
'}';
}
}
2、序列化工具类
package com.lee.writable;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class TestJavaSerializableUtils {
// 保存对象,序列化
public static void saveObject(Object object) throws Exception {
ObjectOutputStream out = null;
FileOutputStream fos = null;
try {
fos = new FileOutputStream("hadoop/src/file/testJavaSer.txt");
out = new ObjectOutputStream(fos);
out.writeObject(object);
} finally {
fos.close();
out.close();
}
}
// 读取对象,反序列化
public static Object readObject() throws Exception {
ObjectInputStream in = null;
FileInputStream fis = null;
try {
fis = new FileInputStream("hadoop/src/file/testJavaSer.txt");
in = new ObjectInputStream(fis);
Object object = in.readObject();
return object;
} finally {
fis.close();
in.close();
}
}
}
3、主类测试
package com.lee.writable;
public class TestJavaSerializable {
public static void main(String[] args) {
long begin = System.currentTimeMillis();
//封装数据
MyInfo info = new MyInfo();
info.setAge(18);
info.setName("lee");
info.setTel("155****0831");
// 序列化
try {
TestJavaSerializableUtils.saveObject(info);
} catch (Exception e) {
System.out.println("保存时异常:" + e.getMessage());
}
// 反序列化
MyInfo myInfo;
try {
myInfo = (MyInfo) TestJavaSerializableUtils.readObject();
System.out.println(myInfo);
} catch (Exception e) {
System.out.println("读取时异常:" + e.getMessage());
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - begin));
}
}
jdk序列化的缺点
1、序列化之后的码流过大
jdk进行序列化编码之后产生的字节数组过大(额外附加校验、header、继承体系),占用的存储内存空间也较高,这就导致了相应的流在网络
传输的时候带宽占用较高,性能相比较为低下的情况。
2、另外,无法跨语言
这一缺点几乎是致命伤害,对于跨进程的服务调用,通常都需要考虑到不同语言的相互调用时候的兼容性,而这一点对于jdk序列化操作来说却
无法做到。这是因为jdk序列化操作时是使用了java语言内部的私有协议,在对其他语言进行反序列化的时候会有严重的阻碍。
三、Hadoop–Writable
由于Java序列化属于重量级且不利于网络传输,而大数据架构的瓶颈主要也集中于磁盘性能和网络IO;另外,自己框架内部的实现,自然更有利
于解决由于Hadoop版本升级引发的问题;Hadoop框架有一套自己的序列化实现--Writable
特点:
1、紧凑,体积小,节省带宽;
2、快速,序列化过程快速;
3、可扩展性(向下兼容),新API支持旧数据格式;
Wriable实现过程
1 实验数据及处理要求
# 数据格式
# id,注册名,平台,原价,优惠,网络状态码
#期望输出
# 注册名 原价 优惠 最终价格
# hdfs 1000 18 982
1,13568436656,Android,2481,81,200
2,13568436656,Android,1116,16,200
3,kylin,PC,1000,152,200
4,kylin,Android,500,0,404
5,sqoop,IOS,5000,206,200
6,jd_lzs,Android,8000,7999,200
7,yarn,IOS,1116,16,200
8,hello_world,IOS,3156,56,200
9,shihaiyang,PC,240,0,200
10,wangerxin,PC,6960,200,200
11,xurui,Android,3659,300,200
12,luomeng,IOS,1938,38,500
13,luomeng,Android,918,7,200
14,hive,Android,180,180,200
15,dahaige,Android,1938,38,200
16,hbase,Android,3008,8,404
17,hello_world,Android,7335,35,404
18,hujie,Android,9531,1,200
19,hujie,Android,11058,1000,200
20,jd_lzs,Android,120,0,200
21,hdfs,PC,1000,18,200
22,hive,PC,1500,0,200
2 编写mapreduce代码
0、dependencies
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>learning</artifactId>
<groupId>com.lee</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hadoop</artifactId>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>d:/jdk1.8/lib/tools.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1、统计消费情况的bean对象
package com.lee.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* (1)必须实现Writable接口
* (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
* (3)重写序列化方法
*
* @Override public void write(DataOutput out) throws IOException {
* }
* (4)重写反序列化方法
* @Override public void readFields(DataInput in) throws IOException {
* }
* (5)注意反序列化的顺序和序列化的顺序完全一致
* (6)要想把结果显示在文件中,需要重写toString(),方便后续使用。
*/
/**
* 支付统计的bean
* */
public class ShoppingBean implements Writable {
//测试文件,全都是int
private int origin_price;
private int discount;
private int pay_money;
//反序列化时反射需要调用空参
public ShoppingBean() {
super();
}
//有参构造,用于reducer输出 封装数据用
public ShoppingBean(int para_origin, int para_discount) {
super();
this.origin_price = para_origin;
this.discount = para_discount;
this.pay_money = para_origin - para_discount;
}
//序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(origin_price);
dataOutput.writeInt(discount);
dataOutput.writeInt(pay_money);
}
//反序列化 注意和序列化顺序一致
@Override
public void readFields(DataInput dataInput) throws IOException {
this.origin_price = dataInput.readInt();
this.discount = dataInput.readInt();
this.pay_money = dataInput.readInt();
}
//getter and setter
// toString()
public int getOrigin_price() {
return origin_price;
}
public void setOrigin_price(int origin_price) {
this.origin_price = origin_price;
}
public int getDiscount() {
return discount;
}
public void setDiscount(int discount) {
this.discount = discount;
}
public int getPay_money() {
return pay_money;
}
public void setPay_money(int pay_money) {
this.pay_money = pay_money;
}
@Override
public String toString() {
return "ShoppingBean{" +
"origin_price=" + origin_price +
", discount=" + discount +
", pay_money=" + pay_money +
'}';
}
}
2、编写Mapper类
package com.lee.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ShoppingMapper extends Mapper<LongWritable, Text, Text, ShoppingBean> {
//map输出,用于封装数据
Text k = new Text();
ShoppingBean shoppingBean = new ShoppingBean();
//(行偏移量,行数据,map输出)
@Override
protected void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
//读取行转为String
String line = values.toString();
//切割
String[] iterms = line.split(",");
//网络状态过滤
if (iterms[iterms.length - 1].equals("404")) {
return;
}
//封装输出的数据
String username = iterms[1];
k.set(username);
int origin_price = Integer.parseInt(iterms[iterms.length - 3]);
int discount = Integer.parseInt(iterms[iterms.length - 2]);
shoppingBean.setOrigin_price(origin_price);
shoppingBean.setDiscount(discount);
context.write(k, shoppingBean);
}
}
3、编写Reducer类
package com.lee.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ShoppingReducer extends Reducer<Text, ShoppingBean, Text, ShoppingBean> {
@Override
protected void reduce(Text key, Iterable<ShoppingBean> values, Context context) throws IOException, InterruptedException {
int sumOrigin = 0;
int sumDiscount = 0;
// reduce已经按用户名归类了,原价和优惠 分别累加一起
for (ShoppingBean value : values) {
sumOrigin += value.getOrigin_price();
sumDiscount += value.getDiscount();
}
//封装数据
ShoppingBean bean = new ShoppingBean(sumOrigin, sumDiscount);
//输出
context.write(key, bean);
}
}
4、编写main类(驱动类)
package com.lee.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ShoppingDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//用于封装 mapreduce各种运行参数
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//jar包
job.setJarByClass(ShoppingDriver.class);
//map reduce 类
job.setMapperClass(ShoppingMapper.class);
job.setReducerClass(ShoppingReducer.class);
//map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ShoppingBean.class);
//最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ShoppingBean.class);
//输入和输出路径
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
// 注意不是import org.apache.hadoop.mapred包下的FileInputFormat/FileOutputFormat
FileInputFormat.setInputPaths(job, new Path("hadoop/src/file/writable/data.txt"));
FileOutputFormat.setOutputPath(job, new Path("hadoop/src/file/writable_out"));
// 将job中配置的相关参数 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5、执行结果,达到预期