Mapreduce的序列化和流量统计程序开发
一、hadoop数据序列化的数据类型
java数据类型 => hadoop数据类型
int intwritable
float floatwritable
long longwritable
double doublewritable
string text
boolean booleanwritable
byte bytewritable
map mapwritable
array arraywritable
二、hadoop的序列化
1.什么是序列化?
在java中,序列化接口是serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以hadoop开发了一套自己的序列化框架——writable。
序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;
反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。
2.序列化的理解方法(自己悟的,不对勿喷~~)
比如下面流量统计案例中,流量的封装类flowbean实现了writable接口,其中定义了变量upflow、dwflow、flowsum;
在mapper和reducer类中初始化封装类flowbean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类flowbean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;
当mapper或reducer需要将这些对象的字节序列写出到磁盘时,封装类flowbean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。
3.序列化特点
序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。
特点:1)紧凑;
2)快速
3)可扩展
4)可互操作
三、mapreduce的流量统计程序案例
1.代码
/**
* @author: princesshug
* @date: 2019/3/23, 23:38
* @blog: https://www.cnblogs.com/hellobigtable/
*/
public class flowbean implements writable {
private long upflow;
private long dwflow;
private long flowsum;
public long getupflow() {
return upflow;
}
public void setupflow(long upflow) {
this.upflow = upflow;
}
public long getdwflow() {
return dwflow;
}
public void setdwflow(long dwflow) {
this.dwflow = dwflow;
}
public long getflowsum() {
return flowsum;
}
public void setflowsum(long flowsum) {
this.flowsum = flowsum;
}
public flowbean() {
}
public flowbean(long upflow, long dwflow) {
this.upflow = upflow;
this.dwflow = dwflow;
this.flowsum = upflow + dwflow;
}
/**
* 序列化
* @param out 输出流
* @throws ioexception
*/
@override
public void write(dataoutput out) throws ioexception {
out.writelong(upflow);
out.writelong(dwflow);
out.writelong(flowsum);
}
/**
* 反序列化
* @param in
* @throws ioexception
*/
@override
public void readfields(datainput in) throws ioexception {
upflow = in.readlong();
dwflow = in.readlong();
flowsum = in.readlong();
}
@override
public string tostring() {
return upflow + "\t" + dwflow + "\t" + flowsum;
}
}
public class flowcountmapper extends mapper<longwritable, text,text,flowbean> {
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
//获取数据
string line = value.tostring();
//切分数据
string[] fields = line.split("\t");
//封装数据
string phonenum = fields[1];
long upflow = long.parselong(fields[fields.length - 3]);
long dwflow = long.parselong(fields[fields.length - 2]);
//发送数据
context.write(new text(phonenum),new flowbean(upflow,dwflow));
}
}
public class flowcountreducer extends reducer<text,flowbean,text,flowbean> {
@override
protected void reduce(text key, iterable<flowbean> values, context context) throws ioexception, interruptedexception {
//聚合数据
long upflow_sum = 0;
long dwflow_sum = 0;
for (flowbean f:values){
upflow_sum += f.getupflow();
dwflow_sum += f.getdwflow();
}
//发送数据
context.write(key,new flowbean(upflow_sum,dwflow_sum));
}
}
public class flowpartitioner extends partitioner<text,flowbean> {
@override
public int getpartition(text key, flowbean value, int i) {
//获取用来分区的电话号码前三位
string phonenum = key.tostring().substring(0, 3);
//设置分区逻辑
int partitionnum = 4;
if ("135".equals(phonenum)){
return 0;
}else if ("137".equals(phonenum)){
return 1;
}else if ("138".equals(phonenum)){
return 2;
}else if ("139".equals(phonenum)){
return 3;
}
return partitionnum;
}
}
public class flowcountdriver {
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
//获取配置,定义工具
configuration conf = new configuration();
job job = job.getinstance();
//设置运行类
job.setjarbyclass(flowcountdriver.class);
//设置mapper类及mapper输出数据类型
job.setmapperclass(flowcountmapper.class);
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(flowbean.class);
//设置reducer类及其输出数据类型
job.setreducerclass(flowcountreducer.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(flowbean.class);
//设置自定义分区
job.setpartitionerclass(flowpartitioner.class);
job.setnumreducetasks(5);
//设置文件输入输出流
fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\flow\\in"));
fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\flow\\inpartitionout"));
//返回运行完成
if (job.waitforcompletion(true)){
system.out.println("运行完毕!");
}else {
system.out.println("运行出错!");
}
}
}
上一篇: 自建简单又实用的动态域名管理系统