Mapreduce的排序(全局排序、分区加排序、Combiner优化)
一、mr排序的分类
1.部分排序:mr会根据自己输出记录的kv对数据进行排序,保证输出到每一个文件内存都是经过排序的;
2.全局排序;
3.辅助排序:再第一次排序后经过分区再排序一次;
4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。
二、mr排序的接口——writablecomparable
该接口继承了hadoop的writable接口和java的comparable接口,实现该接口要重写write、readfields、compareto三个方法。
三、流量统计案例的排序与分区
/**
* @author: princesshug
* @date: 2019/3/24, 15:36
* @blog: https://www.cnblogs.com/hellobigtable/
*/
public class flowsortbean implements writablecomparable<flowsortbean> {
private long upflow;
private long dwflow;
private long flowsum;
public flowsortbean() {
}
public flowsortbean(long upflow, long dwflow) {
this.upflow = upflow;
this.dwflow = dwflow;
this.flowsum = upflow + dwflow;
}
public long getupflow() {
return upflow;
}
public void setupflow(long upflow) {
this.upflow = upflow;
}
public long getdwflow() {
return dwflow;
}
public void setdwflow(long dwflow) {
this.dwflow = dwflow;
}
public long getflowsum() {
return flowsum;
}
public void setflowsum(long flowsum) {
this.flowsum = flowsum;
}
@override
public void write(dataoutput out) throws ioexception {
out.writelong(upflow);
out.writelong(dwflow);
out.writelong(flowsum);
}
@override
public void readfields(datainput in) throws ioexception {
upflow = in.readlong();
dwflow = in.readlong();
flowsum = in.readlong();
}
@override
public string tostring() {
return upflow + "\t" + dwflow + "\t" + flowsum;
}
@override
public int compareto(flowsortbean o) {
return this.flowsum > o.getflowsum() ? -1:1;
}
}
public class flowsortmapper extends mapper<longwritable, text,flowsortbean,text> {
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
//获取数据
string line = value.tostring();
//切分数据
string[] fields = line.split("\t");
//封装数据
long upflow = long.parselong(fields[1]);
long dwflow = long.parselong(fields[2]);
//传输数据
context.write(new flowsortbean(upflow,dwflow),new text(fields[0]));
}
}
public class flowsortreducer extends reducer<flowsortbean,text,text,flowsortbean> {
@override
protected void reduce(flowsortbean key, iterable<text> values, context context) throws ioexception, interruptedexception {
context.write(values.iterator().next(),key);
}
}
public class flowsortpartitioner extends partitioner<flowsortbean, text> {
@override
public int getpartition(flowsortbean key, text value, int i) {
string phonenum = value.tostring().substring(0, 3);
int partition = 4;
if ("135".equals(phonenum)){
return 0;
}else if ("137".equals(phonenum)){
return 1;
}else if ("138".equals(phonenum)){
return 2;
}else if ("139".equals(phonenum)){
return 3;
}
return partition;
}
}
public class flowsortdriver {
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
//设置配置,初始化job类
configuration conf = new configuration();
job job = job.getinstance(conf);
//设置执行类
job.setjarbyclass(flowsortdriver.class);
//设置mapper、reducer类
job.setmapperclass(flowsortmapper.class);
job.setreducerclass(flowsortreducer.class);
//设置mapper输出数据类型
job.setmapoutputkeyclass(flowsortbean.class);
job.setmapoutputvalueclass(text.class);
//设置reducer输出数据类型
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(flowsortbean.class);
//设置自定义分区
job.setpartitionerclass(flowsortpartitioner.class);
job.setnumreducetasks(5);
//设置文件输入输出类型
fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\flow\\flowsort\\in"));
fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\flow\\flowsort\\partitionout"));
//提交任务
if (job.waitforcompletion(true)){
system.out.println("运行完成!");
}else {
system.out.println("运行失败!");
}
}
}
注意:再写mapper类的时候,要注意kv对输出的数据类型,key的类型一定要为flowsortbean,因为在mapper和reducer之间进行的排序(只是排序)是通过mapper输出的key来进行排序的,而分区可以指定是通过key或者value。
四、combiner合并
combiner是在mr之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化mr程序。
combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。
例如wordcount程序,当单词文件的大小到达一定程度,可以使用自定义combiner进行优化:
public class wordcountcombiner extends reducer<text,intwritable,text,intwritable>{
protected void reduce(text key,iterable<intwritable> values,context context){
//计数
int count = 0;
//累加求和
for(intwritable v:values){
count += v.get();
}
//输出
context.write(key,new intwritable(count));
}
}
然后再driver类中设置使用combiner类
job.setcombinerclass(wordcountcombiner.class);
如果仔细观察,wordcount的自定义combiner类与reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置combiner时也可以这样:
job.setcombinerclass(wordcountreducer.class);
注意:combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:
mapper输出两个分区:3,5,7 =>avg=5
2,6 =>avg=4
reducer合并输出: 5,4 =>avg=4.5 但是实际应该为4.6,错误!
所以在使用combiner时要注意其不会影响最中的结果!!!
下一篇: 网站优化网站案例分析如何做好站内优化