flink实战--MAC-flink环境搭建&wordcount实现
程序员文章站
2022-06-16 15:20:53
...
flink环境搭建
java环境
这里我使用的是jdk 1.8,下载jdk,自行设置环境变量。
$: java -version
java version "1.8.0_221"
Java(TM) SE Runtime Environment (build 1.8.0_221-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
flink环境
目前Flink已经发展到1.10.0,我们采用最新的版本进行环境搭建,我们从这里下载最新版本的Flink安装包。
解压放到我们经常存放software的地方,然后进行环境变量的更改:
FLINK_HOME=/Users/lidongmeng/software/flink-1.10.0
PATH=$PATH:$FLINK_HOME/bin
terminal里面进行查看Flink version信息:
$: flink --version
Version: 1.10.0, Commit ID: aa4eb8f
flink启动
让我们先启动起来flink,看一看flink自身提供的web UI。
$: ~/software/flink-1.10.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.
启动后,我们可以在web界面上面查看:
flink–wordcount实现
详细代码参见:https://github.com/ldm0213/flink-repos
创建maven工程
笔者使用IDEA作为编辑器进行开发,创建maven项目:
指定groupId和artifactId:
添加maven依赖
接下来我们需要添加flink需要的包依赖:
<properties>
<flink.version>1.10.0</flink.version>
<scala.compiler.version>2.11</scala.compiler.version>
<!-- 指定maven编译时候的java版本 -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compiler.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--加入下面两个依赖才会出现 Flink 的日志出来-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
程序编写
我们要实现的是从socket stream里面读取数据,对每行数据进行按照空格分隔成单词,统计各个单词出现的次数,并进行输出。
import com.flink.transformation.LineSplitMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkWordCountMain {
public static String HOST = "127.0.0.1";
public static Integer PORT = 8823;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = env.socketTextStream(HOST, PORT);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum =
stream.flatMap(new LineSplitMapFunction()).
keyBy(0).
sum(1);
sum.print();
env.execute("Flink word-count example");
}
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class LineSplitMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] items = s.split(" ");
for (String item: items) {
collector.collect(new Tuple2<>(item, 1));
}
}
}
程序提交&运行
由于我们的数据源是socket中数据,所以我们终端里面在端口8823开启一个nc进程:
$:nc -l 8823
hello world
hello world
what ever
本地运行
直接运行FlinkWordCountMain
的main方法:
[Keyed Aggregation -> Sink: Print to Std. Out (7/8)] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
[Keyed Aggregation -> Sink: Print to Std. Out (5/8)] INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory.
3> (hello,1)
5> (world,1)
6> (ever,1)
4> (what,1)
3> (hello,2)
5> (world,2)
提交任务
- 提交任务到web UI上运行,先打包:
mvn clean package
- 包上传:
- 指定运行参数&提交任务:
- 运行界面:
- 结果输出查看:
参考
上一篇: IDEA实现JDBC的操作步骤
下一篇: 一篇文章带你搞定Python多进程