第 20 节 Flink Counters(scala)
程序员文章站
2022-03-14 19:02:44
...
上篇:第 19 节 Flink Broadcast广播变量-(scala代码)
1、Flink Accumulators & Counters
Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现
IntCounter, LongCounter 和 DoubleCounter
用法
1:创建累加器
private IntCounter numLines = new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator(“num-lines”, this.numLines);
3:使用累加器
this.numLines.add(1);
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult(“num-lines”)
2、具体代码实现
package batch
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* Counter累加器
*/
object BatchDemoCounterScala {
def main(args: Array[String]): Unit = {
//获取flink的运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//隐式转化
import org.apache.flink.api.scala._
val data=env.fromElements("a","b","c","d")
val res= data.map(new RichMapFunction[String,String] {
//1、定义累加器
val numlines = new IntCounter()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//2、注册累加器
getRuntimeContext.addAccumulator("num-lines",this.numlines)
}
override def map(value: String): String = {
this.numlines.add(1)
value
}
}).setParallelism(4)
res.writeAsText("d:\\data\\count20")
val jobResult = env.execute("BatchDemoCounterScala")
//3、获取累加器
val num = jobResult.getAccumulatorResult[Int]("num-lines")
println("num:"+num)
}
}
启动idea的代码程序,控制台打印信息:
d盘都可以查看文件