combiner中使用状态模式
mapreduce中的combine过程
hadoop的map过程执行完成后,每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,减少在map和reduce节点之间的数据传输量,提高网络I/O性能。
Combiner最基本的就是实现本地key的聚合,对map输出的key 排序,value进行迭代。Combiner在本质上就是一个本地的Reducer,其输入<Key, Value>和输出<Key,Value>类型是一致的。如果不用Combiner,所有的结果都在Reducer中进行合并。
但是,不要以为在MapReduce程序中设置了Combiner,Combiner就一定能够执行,也不能假定Combiner执行的次数(由于我们项目中就对Combiner进行了仅能执行一次的假定,造成了程序执行的不确定性,有时能够正常执行,而出现问题时发现重新执行几次偶尔能够执行成功,这也是传说中的“撞大运编程”)。
Combiner函数可能会在map的merge操作完成之前,也可能在merge之后执行,这个时机由配置参数min.num.spill.for.combine指定(该值默认为3),也就是说在map端产生的spill文件最少有min.num.spill.for.combine的时候,Combiner函数会在merge操作合并最终的本机结果文件之前执行,否则在merge之后执行。通过这种方式,就可以在spill文件很多并且需要做conbine的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。
hadoop文档中也有说明Combiner可能被执行也可能不被执行,如果当前集群在很繁忙的情况下job就是设置了也不会执行Combiner。
需要注意的是,虽然combiner使用合适可以提高Job执行作业的吞吐量,但不合适的应用场景可能导致输出结果不正确。Combiner的输出是Reducer的输入,绝不能改变最终的计算结果。
需求分析
最终回到项目实际问题的所在,我们需要执行的是一个汇总统计,这个汇总统计有这么几个需求:
- 根据结果数据,对结果需要进行合并统计分析,共两项指标,如果第一项指标大于4或第二项指标大于8时,最终的结果数据就为不合格;
- 如果最终的指标统计全部放到Reduce端来做,就会导致数据量过大,因为我们需要的执行结果为单个文件,Reducer的数量设置为1,因此需要Combiner的帮助;
- Combiner的执行次数不能影响最终结果(之前的版本就是因为依赖了Combiner的执行次数,导致了结果的不确定性)。
经过简单地分析,我们将这个过程简单抽象成三种不同的状态:
- 初始状态(当前没有值,当然这只是一种中间状态);
- 有数据状态(此时,数据会进行累积,但是还没有到达不合格状态);
- 不合格状态(这是一种最终状态,此时对需要combine的所有值,均不需要再进行任何操作);
状态模式
策略模式是围绕着可以互换的算法来创建成功的业务的,而状态模式通过改变对象内部状态来帮助对象控制自己的行为。
使用状态模式后处理的类图结构:
CombinerMediator在每个具体状态对象中合并过来的参数来改变自己的状态,已确定当前处于的状态;
IMapResultValue作为Mapper端输出的结果,分成几种类型,其中MapResultElement为Map端输出的单条记录;其余两种类型分别对应状态对象的输出结果。
最终结果还需要在Reducer端进行最后一次合并,其过程与Combiner类似,状态模式的使用保证无论经过多少次执行,最终结果状态都是一致的。
状态模式允许对象在内部状态改变时,改变其行为,对象看起来修改了它的类。每种状态都封装成了具体的类,并将动作委托到代表当前状态的对象,行为会根据内部状态的改变而改变,在这里CombinerMediator会随着状态的内部combineStat的改变而后续行为改变。
其类图与策略模式是一样的,区别就在于其意图。在状态模式中,我们将一群行为封装在状态对象中,CombinerMediator会随时可委托到那些状态对象中的一个,当前状态在不同的状态对象集合中游走改变,反映context内部的状态,CombinerMeditor的行为也会改变。相比于策略模式,我们把状态模式想像成是不用在中间对象CombinerMediator中放置过多条件判断的替代方法。