Flink DataSet API 之 Accumulators & Counters(累加器)
程序员文章站
2022-03-08 09:43:57
...
基本介绍
1、Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
2、Counter是一个具体的累加器(Accumulator)实现。例如:IntCounter, LongCounter 和 DoubleCounter
用法
1:创建累加器: private IntCounter numLines = new IntCounter();
2:注册累加器: getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器: this.numLines.add(1);
4:获取累加器的结果: myJobExecutionResult.getAccumulatorResult("num-lines")
使用Demo
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object BatchDemoCounter {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.fromElements("a","b","c","d")
val res = data.map(new RichMapFunction[String, String] {
val numLines = new IntCounter()//定义
override def open(parameters: Configuration): Unit = {
super.open(parameters)
getRuntimeContext.addAccumulator("num-lines", numLines)// 注册
}
override def map(in: String): String = {
this.numLines.add(1)// 使用
in
}
}).setParallelism(4)
res.writeAsText("d:/sshdata/count").setParallelism(1)
val jobResult = env.execute("BatchDemoCounterScala")
val num = jobResult.getAccumulatorResult[Int]("num-lines")// 获取
println(num)
}
}
上一篇: 手写call, apply, bind
下一篇: 多种队列的实现
推荐阅读
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作
-
Flink与SparkStreaming之Counters& Accumulators累加器双向应用案例实战-Flink牛刀小试
-
Flink DataSet API 之 Accumulators & Counters(累加器)
-
Flink DataSet API 之 Distributed Cache(分布式缓存)
-
Flink DataSet API之partition