欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(1)Flink学习-开始篇

程序员文章站 2022-06-17 09:05:49
...

目录

 

1、下载文件

2、安装

2.1 本地安装

2.2 集群安装

2.2.1 Standalone模式

2.2.2 Flink on Yarn模式

2.2.3 其他模式

3 第一个例子

3.1pom.xml

3.2 java

3.3 测试


1、下载文件

flink-1.9.2-bin-scala_2.12.tgz 

国外镜像下载太慢,可以采用国内的镜像,例如清华大学开源软件镜像站:https://mirrors.tuna.tsinghua.edu.cn/

2、安装

2.1 本地安装

(1)Flink学习-开始篇

(1)Flink学习-开始篇

2.2 集群安装

2.2.1 Standalone模式

独立部署模式,不依赖于其他平台。

2.2.2 Flink on Yarn模式

依赖于Hadoop集群

2.2.3 其他模式

Mesos

Docker

K8s

AWS

 

3 第一个例子

3.1pom.xml

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.9.2</version>
        </dependency>

3.2 java

注意项目不要在springboot框架的基础上执行,因为springboot会运行tomcat容器等,会引起flink rn -d -c XXXXX 的错误。

package com.steven.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.List;

public class Demo1 {
    public static void main(String[] args) {
        //获取执行环境 ExecutionEnvironment (批处理用这个对象)
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //加载数据源到 DataSet
        DataSet<String> text = env.readTextFile("D:\\test.txt");
        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        //s 即从文本中读取到一行字符串,按空格分割后得到数组tokens
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                //初始化每一个单词,保存为元祖对象
                                collector.collect(new Tuple2<String, Integer>(token, 1));
                            }
                        }
                    }
                })
                        .groupBy(0) //0表示Tuple2<String, Integer> 中的第一个元素,即分割后的单词
                        .aggregate(Aggregations.SUM, 1); //同理,1表示Tuple2<String, Integer> 中的第二个元素,即出现次数

        try {
            //从DataSet 中获得集合,并遍历
            List<Tuple2<String, Integer>> list = counts.collect();
            for (Tuple2<String, Integer> tuple2 : list) {
                System.out.println(tuple2.f0 + ":" + tuple2.f1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

3.3 测试

在D:\\test.txt中加入数据

hello world hello
flink demo

 

运行结果如下:

flink:1
world:1
hello:2
demo:1

 

相关标签: Flink flink