Flink(18):Flink之累加器
程序员文章站
2022-07-14 14:25:04
...
目录
0. 相关文章链接
1. Flink中的累加器概述
Flink中的累加器,与Mapreduce counter的应用场景类似,可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。
Flink有以下内置累加器,每个累加器都实现了Accumulator接口。
- IntCounter
- LongCounter
- DoubleCounter
2. 编码步骤
- 创建累加器:private IntCounter numLines = new IntCounter();
- 注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);
- 使用累加器:this.numLines.add(1);
- 获取累加器的结果:myJobExecutionResult.getAccumulatorResult("num-lines")
3. 代码演示
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
/**
* Author itcast
* Desc 演示Flink累加器,统计处理的数据条数
*/
public class OtherAPI_Accumulator {
public static void main(String[] args) throws Exception {
//1.env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.Source
DataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd");
//3.Transformation
MapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() {
//-1.创建累加器
private IntCounter elementCounter = new IntCounter();
Integer count = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//-2注册累加器
getRuntimeContext().addAccumulator("elementCounter", elementCounter);
}
@Override
public String map(String value) throws Exception {
//-3.使用累加器
this.elementCounter.add(1);
count+=1;
System.out.println("不使用累加器统计的结果:"+count);
return value;
}
}).setParallelism(2);
//4.Sink
result.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
//5.execute
//-4.获取加强结果
JobExecutionResult jobResult = env.execute();
int nums = jobResult.getAccumulatorResult("elementCounter");
System.out.println("使用累加器统计的结果:"+nums);
}
}
此博客根据某马2020年贺岁视频改编而来:https://www.bilibili.com/video/BV1oX4y1K7kM
上一篇: JavaScript 手写call,apply,bind的实现
下一篇: 队列的多种C语言实现
推荐阅读
-
Flink DataStream API之Operators
-
Flink实战之合并小文件
-
flink学习之六-数据持久化to-kafka
-
Flink(18):Flink之累加器
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink 自定义Sink 之 写入HDFS
-
Flink 批处理之DataSet
-
【09】Flink 之 DataSet API(三):DataSet Sink 操作
-
Flink批量处理之DataSet
-
Flink之DataSet语义注释、广播变量、分布式缓存及参数传递