欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink transformation

程序员文章站 2024-03-05 18:10:07
...

1、Reduce

	通过reduce可以实现average, sum, min, max, count 等功能 ,reduce第一个参数以reduce操作结果,
 第二个参数是当前元素。reduce每传入一个元素生成一个新的元素

下面例子的功能:统计相同名称产品价格总和

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Produce> ds = env.fromCollection(StudentUtil.getStudentList());
        //根据产品名称分组
        KeyedStream<Produce, String> ks = ds.keyBy(new KeySelector<Produce, String>() {
            @Override
            public String getKey(Produce s) throws Exception {
                return s.getName();
            }
        });
        SingleOutputStreamOperator<Produce> reds = ks.reduce(new ReduceFunction<Produce>() {
            @Override
            public Produce reduce(Produce s, Produce t1) throws Exception {
                //相同名称的产品价格求和(average, sum, min, max, count等功能只需要在这个方法中实现)
                return new Produce(s.getName(), s.getPrice() + t1.getPrice());
            }
        }).filter(new FilterFunction<Produce>() {
            //filter 做了个简单的输出
            @Override
            public boolean filter(Produce s) throws Exception {
                System.out.println(String.format("输出结果 name = %s, price = %d", s.getName(), s.getPrice()));
                return true;
            }
        });
        try {
            env.execute("测试  reduce");
        } catch (Exception e) {
            e.printStackTrace();
        }


输入数据

输出结果 name = name2, price = 2
输出结果 name = name2, price = 4
输出结果 name = name4, price = 4
输出结果 name = name0, price = 3
输出结果 name = name4, price = 3
输出结果 name = name2, price = 4
输出结果 name = name2, price = 1
输出结果 name = name0, price = 0
输出结果 name = name1, price = 3
输出结果 name = name4, price = 1

输出结果

输出结果 name = name2, price = 2
输出结果 name = name2, price = 6
输出结果 name = name4, price = 4
输出结果 name = name0, price = 3
输出结果 name = name4, price = 7
输出结果 name = name2, price = 10
输出结果 name = name2, price = 11
输出结果 name = name0, price = 3
输出结果 name = name1, price = 3
输出结果 name = name4, price = 8

2、Aggregations

DataStream API 提供了多种聚合方式,例如 min,max,sum 等。这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

*注意:maxby 返回最大值所在的那条数据,max是将最大值赋予当前数据所对应的属性,返回当前数据,min和minby同理

代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //Tuple3Util.getTuple2List()是获取一个集合
        DataStream<Tuple3<String, Integer, String>> ds = env.fromCollection(Tuple3Util.getTuple2List());

        ds.keyBy(0).sum(1).filter(new FilterFunction<Tuple3<String, Integer, String>>() {
            //只用于输出
            @Override
            public boolean filter(Tuple3<String, Integer, String> t) throws Exception {
                System.out.println(String.format("产品名称 = %s, 产品价格 = %d, 产品备注 = %s", t.f0, t.f1, t.f2));
                return true;
            }
        });
        env.execute("test   Aggregations");

1、maxby 方式聚合

一、输入数据:

产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 3
产品名称 = name1, 产品价格 = 0, 产品备注 = 备注 = 1
产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5
产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9

二、输出结果

产品名称 = name2, 产品价格 = 0, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name4, 产品价格 = 5, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 3, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name1, 产品价格 = 8, 产品备注 = 备注 = 5
产品名称 = name3, 产品价格 = 0, 产品备注 = 备注 = 8
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 5
产品名称 = name1, 产品价格 = 9, 产品备注 = 备注 = 9

2、max方式

一、输入数据:

产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 6
产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 6, 产品备注 = 备注 = 9
产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 0, 产品备注 = 备注 = 1
产品名称 = name3, 产品价格 = 1, 产品备注 = 备注 = 1

二、输出结果

产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name4, 产品价格 = 1, 产品备注 = 备注 = 3
产品名称 = name2, 产品价格 = 6, 产品备注 = 备注 = 6
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name1, 产品价格 = 3, 产品备注 = 备注 = 4
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7
产品名称 = name0, 产品价格 = 7, 产品备注 = 备注 = 0
产品名称 = name3, 产品价格 = 4, 产品备注 = 备注 = 7

flink transformation

相关标签: flink 大数据