辅助排序和Mapreduce整体流程
一、辅助排序
需求:先有一个订单数据文件,包含了订单id、商品id、商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据。
思路:1.封装订单类orderbean,实现writablecomparable接口;
2.自定义mapper类,确定输入输出数据类型,写业务逻辑;
3.自定义分区,根据不同的订单id返回不同的分区值;
4.自定义reducer类;
5.辅助排序类ordergroupingcomparator继承writablecomparator类,并定义无参构成方法、重写compare方法;
6.书写driver类;
代码如下:
/**
* @author: princesshug
* @date: 2019/3/25, 21:42
* @blog: https://www.cnblogs.com/hellobigtable/
*/
public class orderbean implements writablecomparable<orderbean> {
private int orderid;
private double orderprice;
public orderbean() {
}
public orderbean(int orderid, double orderprice) {
this.orderid = orderid;
this.orderprice = orderprice;
}
public int getorderid() {
return orderid;
}
public void setorderid(int orderid) {
this.orderid = orderid;
}
public double getorderprice() {
return orderprice;
}
public void setorderprice(double orderprice) {
this.orderprice = orderprice;
}
@override
public string tostring() {
return orderid + "\t" + orderprice;
}
@override
public int compareto(orderbean o) {
int rs ;
if (this.orderid > o.getorderid()){
rs = 1;
}else if (this.orderid < o.getorderid()){
rs = -1;
}else {
rs = (this.orderprice > o.getorderprice()) ? -1:1;
}
return rs;
}
@override
public void write(dataoutput out) throws ioexception {
out.writeint(orderid);
out.writedouble(orderprice);
}
@override
public void readfields(datainput in) throws ioexception {
orderid = in.readint();
orderprice = in.readdouble();
}
}
public class ordermapper extends mapper<longwritable, text,orderbean, nullwritable> {
@override
protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
//获取数据
string line = value.tostring();
//切割数据
string[] fields = line.split("\t");
//封装数据
int orderid = integer.parseint(fields[0]);
double orderprice = double.parsedouble(fields[2]);
orderbean orderbean = new orderbean(orderid, orderprice);
//发送数据
context.write(orderbean,nullwritable.get());
}
}
public class orderpartitioner extends partitioner<orderbean, nullwritable> {
@override
public int getpartition(orderbean orderbean, nullwritable nullwritable, int i) {
//构造参数中i的值为reducetask的个数
return (orderbean.getorderid() & integer.max_value ) % i;
}
}
public class orderreducer extends reducer<orderbean, nullwritable,orderbean,nullwritable> {
@override
protected void reduce(orderbean key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {
context.write(key,nullwritable.get());
}
}
public class ordergrouptingcomparator extends writablecomparator {
//必须使用super调用父类的构造方法来定义对比的类为orderbean
protected ordergrouptingcomparator(){
super(orderbean.class,true);
}
@override
public int compare(writablecomparable a, writablecomparable b) {
orderbean abean = (orderbean)a;
orderbean bbean = (orderbean)b;
int rs ;
if (abean.getorderid() > bbean.getorderid()){
rs = 1;
}else if (abean.getorderid() < bbean.getorderid()){
rs = -1;
}else {
rs = 0;
}
return rs;
}
}
public class orderdriver {
public static void main(string[] args) throws ioexception, classnotfoundexception, interruptedexception {
//配置信息,job对象
configuration conf = new configuration();
job job = job.getinstance(conf);
//执行类
job.setjarbyclass(orderbean.class);
//设置mapper、reducer类
job.setmapperclass(ordermapper.class);
job.setreducerclass(orderreducer.class);
//设置mapper输出数据类型
job.setmapoutputkeyclass(orderbean.class);
job.setmapoutputvalueclass(nullwritable.class);
//设置reducer输出数据类型
job.setoutputkeyclass(orderbean.class);
job.setoutputvalueclass(nullwritable.class);
//设置辅助排序
job.setgroupingcomparatorclass(ordergrouptingcomparator.class);
//设置分区类
job.setpartitionerclass(orderpartitioner.class);
//设置reducetask数量
job.setnumreducetasks(3);
//设置文件输入输出流
fileinputformat.setinputpaths(job,new path("g:\\mapreduce\\order\\in"));
fileoutputformat.setoutputpath(job,new path("g:\\mapreduce\\order\\out"));
//提交任务
if (job.waitforcompletion(true)){
system.out.println("运行完成!");
}else {
system.out.println("运行失败!");
}
}
}
由于这是敲了很多次的代码,没有加太多注释,请谅解!
二、mapreduce整体的流程
1.有一块200m的文本文件,首先将待处理的数据提交客户端;
2.客户端会向yarn平台提交切片信息,然后yarn计算出所需要的maptask的数量为2;
3.程序默认使用fileinputformat的textinputformat方法将文件数据读到maptask;
4.maptask运行业务逻辑,然后将数据通过inputoutputcontext写入到环形缓冲区;
5.环形缓冲区其实是内存开辟的一块空间,就是内存,当环形缓冲区内数据达到默认大小100m的80%时,发生溢写;
6.溢写出的数据会进行多次的分区排序(shuffle机制,下一个随笔详细解释);
7.分区排序后的数据块可以选择进行combiner合并,然后写入本地磁盘;
8.reducetask等maptask完全运行完毕后,开始从磁盘中读取maptask产出写出的数据,然后进行合并文件,归并排序(这时就是进行上面辅助排序的时候);
9.reducer一次读取一组数据,然后使用默认的textoutputformat方法将数据写出到结果文件。