欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataSet API 之 Accumulators & Counters(累加器)

程序员文章站 2022-03-08 09:43:57
...

基本介绍

1、Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

2、Counter是一个具体的累加器(Accumulator)实现。例如:IntCounter, LongCounter 和 DoubleCounter

用法

1:创建累加器:      private IntCounter numLines = new IntCounter();

2:注册累加器:      getRuntimeContext().addAccumulator("num-lines", this.numLines);

3:使用累加器:             this.numLines.add(1);

4:获取累加器的结果:  myJobExecutionResult.getAccumulatorResult("num-lines")

使用Demo

import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object BatchDemoCounter {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val data = env.fromElements("a","b","c","d")

    val res = data.map(new RichMapFunction[String, String] {
      val numLines = new IntCounter()//定义
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        getRuntimeContext.addAccumulator("num-lines", numLines)// 注册
      }

      override def map(in: String): String = {
        this.numLines.add(1)// 使用
        in
      }
    }).setParallelism(4)
    res.writeAsText("d:/sshdata/count").setParallelism(1)
    val jobResult = env.execute("BatchDemoCounterScala")
    val num = jobResult.getAccumulatorResult[Int]("num-lines")// 获取
    println(num)
  }
}

 

相关标签: Flink