flink transformation
程序员文章站
2024-03-05 18:10:07
...
1、Reduce
通过reduce可以实现average, sum, min, max, count 等功能 ,reduce第一个参数以reduce操作结果,
第二个参数是当前元素。reduce每传入一个元素生成一个新的元素
下面例子的功能:统计相同名称产品价格总和
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Produce> ds = env.fromCollection(StudentUtil.getStudentList());
//根据产品名称分组
KeyedStream<Produce, String> ks = ds.keyBy(new KeySelector<Produce, String>() {
@Override
public String getKey(Produce s) throws Exception {
return s.getName();
}
});
SingleOutputStreamOperator<Produce> reds = ks.reduce(new ReduceFunction<Produce>() {
@Override
public Produce reduce(Produce s, Produce t1) throws Exception {
//相同名称的产品价格求和(average, sum, min, max, count等功能只需要在这个方法中实现)
return new Produce(s.getName(), s.getPrice() + t1.getPrice());
}
}).filter(new FilterFunction<Produce>() {
//filter 做了个简单的输出
@Override
public boolean filter(Produce s) throws Exception {
System.out.println(String.format("输出结果 name = %s, price = %d", s.getName(), s.getPrice()));
return true;
}
});
try {
env.execute("测试 reduce");
} catch (Exception e) {
e.printStackTrace();
}
输入数据
输出结果 name = name2, price = 2
输出结果 name = name2, price = 4
输出结果 name = name4, price = 4
输出结果 name = name0, price = 3
输出结果 name = name4, price = 3
输出结果 name = name2, price = 4
输出结果 name = name2, price = 1
输出结果 name = name0, price = 0
输出结果 name = name1, price = 3
输出结果 name = name4, price = 1
输出结果
输出结果 name = name2, price = 2
输出结果 name = name2, price = 6
输出结果 name = name4, price = 4
输出结果 name = name0, price = 3
输出结果 name = name4, price = 7
输出结果 name = name2, price = 10
输出结果 name = name2, price = 11
输出结果 name = name0, price = 3
输出结果 name = name1, price = 3
输出结果 name = name4, price = 8
2、Aggregations
DataStream API 提供了多种聚合方式,例如 min,max,sum 等。这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。
*注意:maxby 返回最大值所在的那条数据,max是将最大值赋予当前数据所对应的属性,返回当前数据,min和minby同理
代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//Tuple3Util.getTuple2List()是获取一个集合
DataStream<Tuple3<String, Integer, String>> ds = env.fromCollection(Tuple3Util.getTuple2List());
ds.keyBy(0).sum(1).filter(new FilterFunction<Tuple3<String, Integer, String>>() {
//只用于输出
@Override
public boolean filter(Tuple3<String, Integer, String> t) throws Exception {
System.out.println(String.format("产品名称 = %s, 产品价格 = %d, 产品备注 = %s", t.f0, t.f1, t.f2));
return true;
}
});
env.execute("test Aggregations");
1、maxby 方式聚合
一、输入数据:
产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 3
产品名称 = name1, 产品价格 = 0, 产品备注 = 备注 = 1
产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5
产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9
二、输出结果
产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5
产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9
2、max方式
一、输入数据:
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 6
产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 1
产品名称 = name3, 产品价格 = 1, 产品备注 = 备注 = 1
二、输出结果
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7
上一篇: PHP页面跳转实现延时跳转的方法
下一篇: Different is Good
推荐阅读
-
Apache Flink 进阶(八):详解 Metrics 原理与实战 阿里巴巴百度apache
-
Ververica Platform-阿里巴巴全新Flink企业版揭秘 阿里巴巴apache金融c
-
Apache Flink 的迁移之路,2 年处理效果提升 5 倍 apache框架游戏
-
Flink入门(十六) State
-
1.11 flink读取本地文件例子以及细节
-
flink interval join 不同时间窗口计算结果相同问题
-
Spark Transformation
-
flink transformation
-
Flink SQL 如何实现数据流的 Join? json
-
Flink作业转换-JobGraph