欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink教程(四) DataStream 常用算子(上)

程序员文章站 2022-06-17 13:23:40
...

一、前言

算子会将一个或多个DataStream转换成一个新的DataStream。

在工作中使用最多的也就这些DataStream转换算子,学好这些算子是入门Flink的必要。

好在Flink的某些算子和Java8的lambda函数很像,这便于理解。下面我会先介绍Java的语法,再介绍Flink的语法,由浅入深。

下面可以从图中看到DataStream和不同Stream之间,经过不同算子可以相互转换。

Flink教程(四) DataStream 常用算子(上)

二、Map

2.1 Java Lambda的Map

Map对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。

简单来说,Map是映射的意思,可将T类型输入,转成R类型的输出。
Flink教程(四) DataStream 常用算子(上)
如下示例是获取WordCount对象集合中的word字段集合。List转换成List代码如下:

public class MapJavaDemo {

    public static void main(String[] args) {

        List<WordCount> wordCountList = new ArrayList<>();
        wordCountList.add(new WordCount("Flink", 3));
        wordCountList.add(new WordCount("Elasticsearch", 1));

        //将每个字段从WordCount类型转成String类型,生成List<String>
        List<String> words = wordCountList.stream()
                          .map(WordCount::getWord)
                          .collect(Collectors.toList());

        System.out.println(words);
    }
}

运行结果如下:

[Flink, Elasticsearch]

上面.map(WordCount::getWord)也可以写成.map(a -> a.getWord()),WordCount::getWord是一种简写方法罢了,a -> a.getWord()表达的更加直白。

2.2 Flink的Map

转换类型:DataStream → DataStream

  • 说明:读取一个元素并生成一个新的元素,例如
  • 举例:
输入 map转换 输出
1,2,3 乘以2 2,4,6
a,b,b 添加一个元素1,组成Tuple2<String, Integer> (a,1),(b,1),(b,1)

下面代码举例

  • 第一步:获取WordCount对象中word集合
  • 第二步:将每个count * 2
public class MapFlinkDemo {

    public static void main(String[] args) throws Exception {

        //创建环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<WordCount> wordCountList = new ArrayList<>();
        wordCountList.add(new WordCount("Flink", 3));
        wordCountList.add(new WordCount("Elasticsearch", 1));

        //设置数据源
        DataSource<WordCount> dataSource = env.fromCollection(wordCountList);

        //转换1:获取WordCount对象中word集合
        MapOperator<WordCount, String> words = dataSource.map(a -> a.getWord());

        //输出结果1
        words.print();

        //转换2:将每个count * 2
        MapOperator<WordCount, Object> doubleCount = dataSource.map(a -> {
            a.setCount(a.getCount() * 2);
            return a;
        });

        //输出结果2
        doubleCount.print();
    }
}

运行结果如下:

Flink
Elasticsearch
WordCount{word='Flink', count=6}
WordCount{word='Elasticsearch', count=2}

三、FlatMap

FlatMap也称扁平化map,它可把多个集合放到一个集合里。
举例说明:

  • WordCount程序中是把输入值,按照空格(" ")切分。
  • “i love you"和"you love me” -> [“i”,“love”,“you”]和[“you”,“love”,“me”]
  • Arrays.stream()将上面2个数组生成的2个Stream流,
  • flatMap()把2个Stream流合成1个Stream

3.1 Java Lambda的FlatMap

public class FlatMapJavaDemo {

    public static void main(String[] args) {

        List<String> wordCountList = new ArrayList<>();
        wordCountList.add("i love you");
        wordCountList.add("you love me");

        Map<String, Long> collect = wordCountList.stream()

                //这一行会生成2个String[]:["i","love","you"]和["you","love","me"]
                .map(a -> a.split(" "))

                //Arrays.stream()将上面2个数组生成的2个Stream<String>流,
                //flatMap()把2个Stream<String>流合成1个Stream<String>
                .flatMap(Arrays::stream)
                
                //在1个Stream<String>上面根据单词聚合统计
                .collect(Collectors.groupingBy(w -> w, Collectors.counting()));
        
        System.out.println(collect);
    }
}

运行结果如下:

{love=2, me=1, i=1, you=2}

3.2 Flink的FlatMap

  • 转换类型:DataStream → DataStream
  • 说明:多组数据->生成多个流->合并成一个流
  • 举例:
输入 flatMap转换 输出
“I love coding”, “I love flink” 切分后,组成Tuple2<String, Integer> (flink,1)2个(love,1)2个(I,1)(coding,1)
public class FlatMapFlinkDemo {

    public static void main(String[] args) throws Exception {

        //创建环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<String> wordCountList = new ArrayList<>();
        wordCountList.add("i love you");
        wordCountList.add("you love me");

        //设置数据源
        DataSource<String> dataSource = env.fromCollection(wordCountList);

        //数据转换
        FlatMapOperator<String, String> wordStream = dataSource.flatMap(
                (FlatMapFunction<String, String>) (in, out) -> {
                    Arrays.stream(in.split(" ")).forEach(out::collect);
                }
        ).returns(Types.STRING);

        wordStream.print();
    }
}

四、Filter

4.1 Java Lambda的Filter

这里的filter方法接收的是Predicate<? super T> predicate,这个返回boolean类型。
Flink教程(四) DataStream 常用算子(上)

public class FilterJavaDemo {

    public static void main(String[] args) {

        List<WordCount> wordCountList = new ArrayList<>();
        wordCountList.add(new WordCount("Flink", 3));
        wordCountList.add(new WordCount("Elasticsearch", 1));

        List<WordCount> collect = wordCountList.stream().filter(a -> a.getCount() > 1).collect(Collectors.toList());

        System.out.println(collect);
    }
}
[WordCount{word='Flink', count=3}]

4.2 Flink的Filter

  • 转换类型:DataStream → DataStream
  • 说明:该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出
  • 举例:
输入 flatMap转换 输出
1, 2, 3, 4, 5, 6 找到奇数 1,3,5
public class FilterFlinkDemo {

    public static void main(String[] args) throws Exception {

        //创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<WordCount> wordCountList = new ArrayList<>();
        wordCountList.add(new WordCount("Flink", 3));
        wordCountList.add(new WordCount("Elasticsearch", 1));

        DataSource<WordCount> source = env.fromCollection(wordCountList);

        FilterOperator<WordCount> filter = source.filter(a -> a.getCount() > 1);

        filter.print();
    }
}

运行结果如下:

WordCount{word='Flink', count=3}

五、KeyBy

在Flink中KeyBy就是分组,如果是DataSet用groupBy,是DataStream用keyBy。

5.1 Java lambda的groupingBy

先按第一个字段分组,再按第二个字段求和

public class GroupByJavaDemo {

    public static void main(String[] args) {

        List<WordCount> wordCountList = new ArrayList<>();
        wordCountList.add(new WordCount("Flink", 3));
        wordCountList.add(new WordCount("Flink", 2));
        wordCountList.add(new WordCount("Elasticsearch", 1));
        wordCountList.add(new WordCount("Elasticsearch", 5));

        //先按第一个字段分组,再按第二个字段求和
        Map<String, Integer> groupByThenSum = wordCountList.stream()
                .collect(
                        Collectors.groupingBy(
                                WordCount::getWord,
                                Collectors.summingInt(WordCount::getCount)
                        )
                );

        System.out.println(groupByThenSum);
    }
}
{Elasticsearch=6, Flink=5}

5.2 Flink的KeyBy

  • 转换类型:DataStream → KeyedStream

  • 说明:具有相同key的所有记录会分配给到同一分区,类似SQL的group by,在内部,keyBy()是使用hash分区实现

  • 如果是元祖,keyBy()里是下标,如keyBy(0), keyBy(0,1)

  • 如果是POJO,keyBy()里是字段名,如keyBy(“word”),如果根据多个字段分组,keyBy(“field1”, “field2”)

  • 举例:WordCount程序

public class StreamWordCountLambdaBetter {

    public static void main(String[] args) throws Exception {

        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建数据源
        DataStreamSource<String> lines = env.socketTextStream(BaseConstant.URL, BaseConstant.PORT);

        //将一行数据,按照空格分隔,将每个字符转成Tuple2.of(s, 1)
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
                (String line, Collector<Tuple2<String, Integer>> out) -> {
                    Arrays.stream(line.split(" ")).forEach(thisWord -> {
                        out.collect(Tuple2.of(thisWord, 1));
                    });
                }
        ).returns(Types.TUPLE(Types.STRING, Types.INT));

        //统计操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = wordAndOne
                .keyBy(value -> value.f0)
                .sum(1);

        //sink
        sumResult.print();

        //执行
        env.execute("StreamWordCountLambdaBetter");
    }
}

六、Aggregations

6.1 Java Lambda的聚合函数

Java8中聚合函数非常简单,很容易理解和上手。

long count = pigs.stream().filter(a -> a.getAge() > 5).count();
System.out.println("age > 5的人数 = " + count);

//limit,取前几个
System.out.println("top2");
Stream<Pig> top2Pigs = pigs.stream().sorted(comparing(Pig::getAge)).limit(2);
top2Pigs.forEach(System.out::println);

int sumAge = pigs.stream().mapToInt(Pig::getAge).sum();
int maxAge = pigs.stream().mapToInt(Pig::getAge).max().getAsInt();
int minAge = pigs.stream().mapToInt(Pig::getAge).min().getAsInt();
double avgAge = pigs.stream().mapToInt(Pig::getAge).average().getAsDouble();

System.out.println("sumAge = " + sumAge);
System.out.println("maxAge = " + maxAge);
System.out.println("minAge = " + minAge);
System.out.println("avgAge = " + avgAge);

Optional<Pig> pigMaxAgeOptional = pigs.stream().collect(Collectors.maxBy(comparing(Pig::getAge)));
if (pigMaxAgeOptional.isPresent()){
    System.out.println("maxAge = " + pigMaxAgeOptional.get().getAge());
}

6.2 Flink中的Aggregations

Flink中聚合有如下
其中要注意:

  • min根据指定的字段取最小,它只返回最小的那个字段,而不是整个数据元素,对于其他的字段取了第一次取的值,不能保证每个字段的数值正确。
  • minBy根据指定字段取最小,返回的是整个元素。
keyedStream.sum(0);
keyedStream.sum(“key”);
keyedStream.min(0);
keyedStream.min(“key”);
keyedStream.max(0);
keyedStream.max(“key”);
keyedStream.minBy(0);
keyedStream.minBy(“key”);
keyedStream.maxBy(0);
keyedStream.maxBy(“key”);
public class StreamMinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List data = new ArrayList<Tuple3<String, String, Integer>>();
        data.add(new Tuple3<>("男", "老王", 80));
        data.add(new Tuple3<>("男", "小王", 25));
        data.add(new Tuple3<>("男", "老李", 85));
        data.add(new Tuple3<>("男", "小李", 20));

        DataStreamSource peoples = env.fromCollection(data);
		
		//求年龄最小的人
        SingleOutputStreamOperator minResult = peoples.keyBy(0).min(2);
        minResult.print();

        env.execute("StreamMinTest");
    }
}

返回结果如下:

1> (男,老王,80)
1> (男,老王,25)
1> (男,老王,25)#年龄算对了,但是别的字段还是第一个老王
1> (男,老王,20)#年龄最小的居然还是第一个老王?

七、总结

本篇博客只是总结了部分Java8和Flink的算子,他们在Java和Flink上有共性,所以拿来一起讲,
这个是网上别的Flink博客所没有的。
感觉学习还是温故而知新,不能停下学习的脚步,也得时不时的回顾以前的知识,才能有新的收获。
下一篇博客得讲讲Transformations 常用算子中的Reduce、Fold、Union、Connect、Split & Select。