Flink入门程序
程序员文章站
2022-06-17 10:25:08
...
Flink的WordCount
使用idea创建一个flink的模板项目:
使用该模板创建项目时,需要安装Scala插件(这个烦请自己找资源)。
准备工作做好之后,就可以操作了。当模板项目创建好之后,发现有2个模板类,一个是流式处理,一个是批量处理。
这里我将代码写进去进行执行:
下面这是批量处理的类。
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.feng;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Collection;
/**
* Skeleton for a Flink Batch Job.
*
* <p>For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for 'mainClass')
* and run 'mvn clean package' on the command line.
* @author Feng
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.readTextFile("D:\\jee-2019-7-idea-maven-workspace\\flink\\target\\classes" +
"\\txt\\wordcount.txt", "UTF-8");
dataSource.flatMap(new WordFlatFunction())
.groupBy(0)
.sum(1)
.print();
}
private static class WordFlatFunction implements FlatMapFunction<String, Tuple2<String, Integer>>{
private static final long serialVersionUID = -868609327661485759L;
private String space = " ";
@Override
public void flatMap(String words, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : words.toLowerCase().split(space)) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
执行代码时,若出现类加载不到,就查看下面图中的这个位置(我的这个状态是可以执行的):
这是因为依赖的Scope的问题,创建好的项目是将运provide。不会在运行时打包的。
然后是运行流处理的那个类:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.feng;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
* @author Feng
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
/// nc -l -p 12345 监听12345端口
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("localhost", 12345);
String regx = "\\W+";
DataStream<Tuple2<String, Integer>> dataStream1 = dataStream
.flatMap((String words, Collector<Tuple2<String, Integer>> collector)->{
for (String word : words.toLowerCase().split(regx)) {
collector.collect(new Tuple2<>(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
// 对Tuple的第一个字段分组
dataStream1.keyBy(0)
.timeWindowAll(Time.seconds(1))
// 对Tuple的第二个字段求和
.sum(1)
.print()
.setParallelism(1);
env.execute("Flink Streaming Java API Skeleton");
}
}
流处理的这个程序是监听一个本地的端口,使用netcat往该端口上发送数据即可监听到。在本地运行时,需要安装netcat的windows版本(https://eternallybored.org/misc/netcat/)。点击这里的连接,下载1.1.2版本。之后解压该压缩包,将里边的所有子文件复制到System32文件夹下,就可以用cmd去访问了。命令是nc -l -p [port],我这里监听的是12345端口。注意程序的启动顺序,打开cmd的窗口,输入命令,再去启动你的flink程序。整个过程不需要任何Flink环境(不用下载Flink压缩包)。