MapReduce框架原理之(二)MapReduce工作流程
MapReduce框架原理之MapReduce工作流程
MapReduce工作流程
1. 流程图
- MapReduce流程图(1)
- MapReduce流程图(2)
2. 流程详解
MapReduce执行机制
这里只是指出个人理解的部分,帮助记忆MapReduce的工作流程,实际上细节还有很多,有不妥的地方还请多多指教.
实际上,我们在Driver调用了job.waitForCompletion后,客户端并不是马上将job提交给YARN,在向YARN提交job之前,客户端会先通过反射,获取到job将要使用到的InputFormat,以获得逻辑的切片规则,并将切片规则记录到本地的文件中:windowns端的话在C:\tmp\hadoop-PC_NAME\mapred\staging\PC_NAMEJOBID\.staging\job_localJOBID目录下
(执行完毕后会被删除)
(注:InputFormat只是进行逻辑切片规则的指定,而不是真正的进行物理切片,真正的物理切片动作是再RecordReader中进行的
)
制定好切片规则后(当然这个过程很复杂,还有很多其他步骤),会将这些job文件(job.xml等)提交给YARN,然后YARN会根据切片规则等信息分配出相应的MapTask,在MapTask阶段通过RecordReader来进行真正的文件切分(物理上的),并将切片后的数据制作成K,V对的形式,交给Mapper的map方法,一般来讲一个切片的数据会启用一个MapTask,在MapTask和ReduceTask中还有一步shuffle的操作,这个先略过,后面会进行说明.经过MapTask阶段的处理,数据来到了ReduceTask,并且一般来讲,相同Key的数据会交给同一个ReduceTask来处理,在ReduceTask中的reduce方法里,写了我们的业务代码,将数据封装成我们需要的格式后输出到目标目录,输出时会根据所指定的OutputFormatClass
类型,来对数据进行输出格式化.
3. shuffle机制
上面提到过shuffle,它在MapTask和ReduceTask中都有存在.
3.1 MapTask中:
- 从map方法通过context.write(key, value)输出出来的K,V数据,会被写入到内存的
环形缓冲区
,它的默认大小为100M
,在其中某一点作为起点,顺时针方向写入数据,逆时针方向写入数据对应的索引等元数据信息.(在环状缓冲区内会进行分区,和第1次排序,快排
) - 每当写入的数据达到环状缓冲区80%的容量时,会发生溢出操作,将数据持久到磁盘,根据数据的量大小,会生成N个溢出文件.
- 这些文件会被合并成大的溢出文件交给Mapper.(
第2次排序,归并排序
) - 在溢出和合并的过程中都会调用Partitioner进行分区并同时针对Key进行排序,默认的分区数总是0.
3.2 ReduceTask中:
- ReduceTask根据自己的分区号,去MapTask机器上找相应的结果分区数据.
- ReduceTask会取到分布在不同MapTask上的同一个分区的数据,ReduceTask会将这些文件再进行合并(
第3次排序,归并排序
) - 行成大文件后,Shuffle的过程就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用reduce方法)
Shuffle环形缓冲区大小会影响到MapReduce的执行效率,原则上说,缓冲区越大磁盘IO的次数就越少,执行速度越快. 缓冲区的大小可以通过调整,io.sort.mb来进行修改,默认是100M
3.3 Partition分区:
- 问题引出
要求将统计结果按条件输出到不同文件中
(分区),比如将统计结果按手机归属地不同省份输出到不同文件中. - 默认Partitioner分区:
protected int getPartition(int hash, int numReduceTasks) {
return (hash & 2147483647) % numReduceTasks;
}
mapred-default.xml
<property>
<name>mapreduce.job.reduces</name>
<value>1</value>
<description>The default number of reduce tasks per job. Typically set to 99%
of the cluster's reduce capacity, so that if a node fails the reduces can
still be executed in a single wave.
Ignored when mapreduce.framework.name is "local".
</description>
</property>
在Java中,Integer的值用32进制来表示的,而2147483647
代表Integer的最大值.这里用hash
值和Integer的最大值做位与运算,最大值总是01111111…即首位是0,其余位置都是1,任何正数与值进行位与计算,结果都是其本身,而负数则被转换成正数.
而默认的numReduceTasks
是1,所以mod结果总是0,即代表默认的只有一个ReduceTask,并且对数据不进行分区.
-
自定义Partitioner步骤
(1)自定义类继承Partitioner,重写getPartition()方法
(2)在Driver类中设置自定义的PartitionerClass
job.setPartitionerClass(xxxxx.class);
(3)自定义Parititioner后要根据其逻辑设置相应的ReduceTastk
job.setNumReduceTasks(5); -
分区总结
(1)如果ReduceTask的数量 > getPartition的结果数,则最终生成的文件会多产生几个part-r-000xx的空文件;
(2)如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,则会抛异常;
(3)如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask来处理,最终也只产生一个part-r-00000文件;
(4)分区号必须从0开始逐一累加.e.g:
假设自定义分区数为5,则:
<1> job.setNumReduceTasks(1);会正常运行,只不过只产生一个输出文件
<2> job.setNumReduceTasks(2);会报错
<3> job.setNumRedeceTasks(6);会正常执行,只不过会产生6个输出文件,其中一个是空的
-
实操演练
(1)需求,将手机按号段输出到不同文件, 136、137、138、139分别放到4个文件,其他号段放一个文件中.
(2)业务代码基于我的另一篇有关Hadoop序列化的博客中的demo,来进行演示,如果对序列化相关内容感兴趣可以参考Hadoop之序列化
自定义Partitioner
package partition;
import cstmbean.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PhonePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int i) {
String code = text.toString().substring(0, 3);
int partition = 0;
switch(code) {
case "136":
partition = 0;
break;
case "137":
partition = 1;
break;
case "138":
partition = 2;
break;
case "139":
partition = 3;
break;
default:
partition = 4;
}
return partition;
}
}
Driver类中
job.setPartitionerClass(PhonePartitioner.class);
job.setNumReduceTasks(5);
执行后会生成下面的5个文件
3.4 WritableComParable排序
-
排序概述
排序是MapReduce的重要操作之一.
MapTask和ReduceTask均会对数据按照key
进行排序.该操作属于Hadoop的默认行为.任何应用程序中的数据均会被排序,而不管逻辑上是否需要
.
默认的排序是按照字典顺序排序
,而且实现排序的方法是快速排序
.对于MapTask,它会将处理的结果暂时放到环形缓冲区中,
当环形缓冲区使用率达到阈值时(默认80%),再对缓冲区内的数据进行一次快速排序
,并将这些数据溢出,写到磁盘上,而当数据处理完毕后,它会对所有该MapTask溢出到磁盘的文件进行归并排序
.
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件(按照分区拷贝
),如果文件大小超过一定阈值,则溢出到磁盘上,否则存储在内存中.如果磁盘上文件数量达到一定阈值,则进行一次归并排序以生成一个更大的文件;当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序.
-
排序的分类
(1)部分排序
MapReduce根据输入记录的键,对数据进行排序.保证输出的每个文件内部都有序
.
(2)全排序最终输出结果只有一个文件,且文件内部有序
.实现方法是只使用一个ReduceTask,但是该方法在处理大型文件时效率极低,因为一台机器处理所有的文件,完全丧失了MapReduce所提供的并行架构.
(3)辅助排序(GroupingComparator分组)
在Reduce端对key进行分组.应用于:key为自定义bean对象时,想让一个或多个字段进行比较后,相同的key进入到同一个reduce方法.
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个,即为二次排序. -
WritableComparable实现
需要在自定义的Bean中,实现WritableComparable接口,而不是实现此前的Writable接口.
而实际上,它继承了Writable和Comparable接口
// 注意,接口的泛型要传入当前类对象
public class FlowBean implements WritableComparable<FlowBean> {
....
....
@Override
public int compareTo(FlowBean o) {
int result;
// 按照总流量大小,倒序排列
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if (sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}
}
3.5 Combiner合并
- 概述
(1)Combiner是MR程序中,Mapper和Reducer之外的一种组件.
(2)Combiner组件的父类就是Reducer.
(3)Combiner和Reducer的区别在于运行的位置
Combiner是在每个MapTask
所在的节点运行.
Reducer是接收全局所有的MapTask的输出结果
.
(4)Combiner的意义就是对每个MapTask的输出结果进行局部汇总,来减少网络IO.
(5)Combiner能够应用的前提是,必须不能影响最终的业务逻辑
,并且输出的K,V应该跟Reducer接收的K,V一致.
比如下面这个求平均数的例子
Mapper Reducer
3 5 7 -> (3+5+7) / 3 = 5 (3+5+7+2+6) / 5 = 4.6 不等于(5+4) / 2 = 4.5
2 6 -> (2+6) / 2 = 4
- 自定义Combiner实现步骤
自定义一个Conbiner类,继承Reducer,重写reduce方法,哪WordCount来演示:
(1)自定义Combiner类WcConbiner
package combiner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WcCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable count = new LongWritable(0);
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
count.set(sum);
context.write(key, count);
}
}
Mapper和Reducer就不写了不会写的可以看我关于MapReduce概述的那篇...这里只把Driver列出来
package wordcount;
import combiner.WcCombiner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WcDriver.class);
job.setMapperClass(WcMapper.class);
// ※这里,要告诉job我们此次任务需要用到的Conbiner类
job.setCombinerClass(WcCombiner.class);
job.setReducerClass(WcReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean rtn = job.waitForCompletion(true);
System.exit(rtn ? 0 : 1);
}
}
我们观察一下使用Combiner前后的日志内容:
未使用Combiner
使用Combiner
3.6 GroupingComparator分组(辅助排序)
对Reduce阶段的数据,根据某一个或者几个字段进行分组
- 分组排序步骤
(1)自定义类继承WritableComparator
(2)重写compare()方法
(3)创建一个构造函数,将比较对象的类传给父类 - 自定义分组类演示
(1)需求
有如下订单
订单id | 商品id | 订单金额 |
---|---|---|
00000001 | Pdt_01 | 33.8 |
00000001 | Pdt_02 | 222.8 |
00000002 | Pdt_03 | 522.8 |
00000002 | Pdt_04 | 122.4 |
00000002 | Pdt_05 | 722.4 |
00000003 | Pdt_06 | 33.8 |
00000003 | Pdt_02 | 232.8 |
要求输出每个订单中最贵的商品
(2)期望数据
00000001 222.8
00000002 722.4
00000003 232.8
(3)GroupingComparator的作用
自定义Bean,我们为类起名OrderBean,实现了WritableComparable接口后,重写compareTo方法,利用二次排序仅仅能达到按订单排序,并且订单内金额降序排序
,值得注意的是,MapTask将结果输出给ReduceTask之前,会按key
进行分组,这里我们输出的key类型为自定义的OrderBean,则MapTask会按照OrderBean中的compareTo方法的排序规则进行分组
,所以最终得到的是7
个完全不同的分组,这就意味着会调用7次reduce
方法.
而我们的需求是输出订单中金额最大的数据.这时候GroupingComparator就派上用场了.
我们需要自定义一个类,并继承WritableComparator类,在类中:
①重写compare
方法,该方法内的比较规则即为reduce端的分组规则.
②需要显示的声明一个构造函数,并将要比较的key类型传给super.
- 代码实现
GroupingComparator类
package grouping;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupingComparator extends WritableComparator {
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean beanA = (OrderBean) a;
OrderBean beanB = (OrderBean) b;
int result = 0;
if(beanA.getOrderId().compareTo(beanB.getOrderId()) > 0) {
result = 1;
} else if (beanA.getOrderId().compareTo(beanB.getOrderId()) < 0) {
result = -1;
}
return result;
}
public GroupingComparator() {
super(OrderBean.class, true);
}
}
OrderBean类
package grouping;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private String productId;
private Double price;
@Override
public String toString() {
return this.orderId + "\t" + this.productId + "\t" + this.price.toString();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.orderId);
dataOutput.writeUTF(this.productId);
dataOutput.writeDouble(this.price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderId = dataInput.readUTF();
this.productId = dataInput.readUTF();
this.price = dataInput.readDouble();
}
@Override
public int compareTo(OrderBean o) {
int result = 0;
if(this.orderId.compareTo(o.getOrderId()) > 0) {
result = 1;
} else if (this.orderId.compareTo(o.getOrderId()) < 0) {
result = -1;
} else {
// 相同订单内金额倒序排列
if(this.price.compareTo(o.getPrice()) > 0) {
result = -1;
} else if (this.price.compareTo(o.getPrice()) < 0) {
result = 1;
} else {
result = 0;
}
}
return result;
}
}
Mapper类
package grouping;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
private OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\t");
bean.setOrderId(words[0]);
bean.setProductId(words[1]);
bean.setPrice(Double.parseDouble(words[2]));
context.write(bean, NullWritable.get());
}
}
Reducer类
package grouping;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
Driver类
package grouping;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
Path oPath = new Path("i:\\output");
if (fs.exists(oPath)) {
fs.delete(oPath, true);
}
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(GroupingComparator.class);
FileInputFormat.addInputPath(job, new Path("i:\\input"));
FileOutputFormat.setOutputPath(job, oPath);
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
下一篇: node实现简单爬虫功能