版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:[email protected],如有任何问题,可随时联系。
1 累加器应用场景
Accumulator即累加器,与Saprk Accumulator 的应用场景差不多,都能很好地观察task在运行期间的数据变化 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。 Counter是一个具体的累加器(Accumulator)实现
2 Flink与SparkStreaming双向应用对比
2.1 Flink累加器使用
Counter是一个具体的累加器(Accumulator)实现,如IntCounter, LongCounter 和 DoubleCounter。
用法
-
1:创建累加器
private IntCounter numLines = new IntCounter();
-
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
-
3:使用累加器
this.numLines.add(1);
-
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
2.2 SparkStreaming累加器使用
-
1:设置自定义累加器,实现所有数据的统计功能,注意累加器也是懒执行的
val sessionAggrStatAccumulator = new SessionAggrStatAccumulator
-
2:自定义累加器实现
class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] { // 保存所有聚合数据 private val aggrStatMap = mutable.HashMap[String, Int]() override def isZero: Boolean = { aggrStatMap.isEmpty } override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = { val newAcc = new SessionAggrStatAccumulator aggrStatMap.synchronized{ newAcc.aggrStatMap ++= this.aggrStatMap } newAcc } override def reset(): Unit = { aggrStatMap.clear() } override def add(v: String): Unit = { if (!aggrStatMap.contains(v)) aggrStatMap += (v -> 0) aggrStatMap.update(v, aggrStatMap(v) + 1) } override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = { other match { case acc:SessionAggrStatAccumulator => { (this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )} } } } override def value: mutable.HashMap[String, Int] = { this.aggrStatMap } } 复制代码
-
3:注册自定义累加器
sc.register(sessionAggrStatAccumulator, "sessionAggrStatAccumulator") 复制代码
-
4:累加器值得获取与使用
calculateAndPersistAggrStat(spark, sessionAggrStatAccumulator.value, taskUUID) sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s); 复制代码
3 Flink 累加器使用案例实战
3.1 注意:只有在任务执行结束后,才能获取到累加器的值
3.2 注意:RichMapFunction的使用发生在高级特性上。
public class BatchDemoCounter {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
//1:创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:注册累加器
getRuntimeContext().addAccumulator("num-lines",this.numLines);
}
//int sum = 0;
@Override
public String map(String value) throws Exception {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
//sum++;
//System.out.println("sum:"+sum);
this.numLines.add(1);
return value;
}
}).setParallelism(8);
//result.print();
result.writeAsText("d:\\BatchDemoCounter");
JobExecutionResult jobResult = env.execute("counter");
//3:获取累加器
int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num:"+num);
}
}
复制代码
3.3 结果展示(writeAsText触发后才能读取到counter)
也即提前执行了一波action操作,然后才能取出其值。
num:4
复制代码
总结
微观看世界,本文虽小,五脏俱全。为了成一套体系,不得已而为之,请期待后面有关Spark和kafka的源码解读系列。辛苦成文,彼此珍惜,谢谢。
秦凯新 于深圳 201811251651