欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink实战--MAC-flink环境搭建&wordcount实现

程序员文章站 2022-06-16 15:20:53
...

flink环境搭建

java环境

这里我使用的是jdk 1.8,下载jdk,自行设置环境变量。

$: java -version
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)

flink环境

目前Flink已经发展到1.10.0,我们采用最新的版本进行环境搭建,我们从这里下载最新版本的Flink安装包。
flink实战--MAC-flink环境搭建&wordcount实现
解压放到我们经常存放software的地方,然后进行环境变量的更改:

FLINK_HOME=/Users/lidongmeng/software/flink-1.10.0
PATH=$PATH:$FLINK_HOME/bin

terminal里面进行查看Flink version信息:

$: flink --version
Version: 1.10.0, Commit ID: aa4eb8f

flink启动

让我们先启动起来flink,看一看flink自身提供的web UI。

$: ~/software/flink-1.10.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.

启动后,我们可以在web界面上面查看:
flink实战--MAC-flink环境搭建&wordcount实现

flink–wordcount实现

详细代码参见:https://github.com/ldm0213/flink-repos

创建maven工程

笔者使用IDEA作为编辑器进行开发,创建maven项目:
flink实战--MAC-flink环境搭建&wordcount实现
指定groupId和artifactId:
flink实战--MAC-flink环境搭建&wordcount实现

添加maven依赖

接下来我们需要添加flink需要的包依赖:

<properties>
    <flink.version>1.10.0</flink.version>
    <scala.compiler.version>2.11</scala.compiler.version>
    
    <!-- 指定maven编译时候的java版本 -->
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.compiler.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
	<!--加入下面两个依赖才会出现 Flink 的日志出来-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>
</dependencies>

程序编写

我们要实现的是从socket stream里面读取数据,对每行数据进行按照空格分隔成单词,统计各个单词出现的次数,并进行输出。

import com.flink.transformation.LineSplitMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkWordCountMain {
    public static String HOST = "127.0.0.1";
    public static Integer PORT = 8823;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> stream = env.socketTextStream(HOST, PORT);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum =
                stream.flatMap(new LineSplitMapFunction()).
                        keyBy(0).
                        sum(1);
        sum.print();

        env.execute("Flink word-count example");
    }
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {

    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
        String[] items = s.split(" ");

        for (String item: items) {
            collector.collect(new Tuple2<>(item, 1));
        }
    }
}

程序提交&运行

由于我们的数据源是socket中数据,所以我们终端里面在端口8823开启一个nc进程:

$:nc -l 8823
hello world
hello world
what ever

本地运行

直接运行FlinkWordCountMain的main方法:

[Keyed Aggregation -> Sink: Print to Std. Out (7/8)] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
[Keyed Aggregation -> Sink: Print to Std. Out (5/8)] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
3> (hello,1)
5> (world,1)
6> (ever,1)
4> (what,1)
3> (hello,2)
5> (world,2)

提交任务

  1. 提交任务到web UI上运行,先打包:
mvn clean package
  1. 包上传:
    flink实战--MAC-flink环境搭建&wordcount实现
  2. 指定运行参数&提交任务:
    flink实战--MAC-flink环境搭建&wordcount实现
  3. 运行界面:
    flink实战--MAC-flink环境搭建&wordcount实现
  4. 结果输出查看:
    flink实战--MAC-flink环境搭建&wordcount实现

参考

  1. http://www.54tianzhisheng.cn/2018/09/18/flink-install/
  2. https://blog.csdn.net/magic_kid_2010/article/details/97135426
  3. https://www.cnblogs.com/ALittleMoreLove/archive/2018/08/09/9449992.html