欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink入门程序

程序员文章站 2022-06-17 10:25:08
...

Flink的WordCount

使用idea创建一个flink的模板项目:

Flink入门程序

使用该模板创建项目时,需要安装Scala插件(这个烦请自己找资源)。

准备工作做好之后,就可以操作了。当模板项目创建好之后,发现有2个模板类,一个是流式处理,一个是批量处理。

这里我将代码写进去进行执行:

下面这是批量处理的类。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.feng;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


import java.util.Collection;

/**
 * Skeleton for a Flink Batch Job.
 *
 * <p>For a tutorial how to write a Flink batch application, check the
 * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution,
 * change the main class in the POM.xml file to this class (simply search for 'mainClass')
 * and run 'mvn clean package' on the command line.
 * @author Feng
 */
public class BatchJob {

	public static void main(String[] args) throws Exception {
		// set up the batch execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSource<String> dataSource = env.readTextFile("D:\\jee-2019-7-idea-maven-workspace\\flink\\target\\classes" +
				"\\txt\\wordcount.txt", "UTF-8");

		dataSource.flatMap(new WordFlatFunction())
				.groupBy(0)
				.sum(1)
				.print();
	}

	private static class WordFlatFunction implements FlatMapFunction<String, Tuple2<String, Integer>>{
		private static final long serialVersionUID = -868609327661485759L;
		private String space = " ";

		@Override
		public void flatMap(String words, Collector<Tuple2<String, Integer>> collector) throws Exception {
			for (String word : words.toLowerCase().split(space)) {
				collector.collect(new Tuple2<>(word, 1));
			}
		}
	}
}

执行代码时,若出现类加载不到,就查看下面图中的这个位置(我的这个状态是可以执行的):

Flink入门程序

这是因为依赖的Scope的问题,创建好的项目是将运provide。不会在运行时打包的。

然后是运行流处理的那个类:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.feng;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


/**
 * Skeleton for a Flink Streaming Job.
 *
 * <p>For a tutorial how to write a Flink streaming application, check the
 * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
 * @author Feng
 */
public class StreamingJob {

	public static void main(String[] args) throws Exception {
		/// nc -l -p 12345 监听12345端口
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<String> dataStream = env.socketTextStream("localhost", 12345);

		String regx = "\\W+";
		DataStream<Tuple2<String, Integer>> dataStream1 = dataStream
			.flatMap((String words, Collector<Tuple2<String, Integer>> collector)->{
				for (String word : words.toLowerCase().split(regx)) {
					collector.collect(new Tuple2<>(word, 1));
				}
			}).returns(Types.TUPLE(Types.STRING, Types.INT));

		// 对Tuple的第一个字段分组
		dataStream1.keyBy(0)
			.timeWindowAll(Time.seconds(1))
			// 对Tuple的第二个字段求和
			.sum(1)
			.print()
			.setParallelism(1);

		env.execute("Flink Streaming Java API Skeleton");
	}
}

流处理的这个程序是监听一个本地的端口,使用netcat往该端口上发送数据即可监听到。在本地运行时,需要安装netcat的windows版本(https://eternallybored.org/misc/netcat/)。点击这里的连接,下载1.1.2版本。之后解压该压缩包,将里边的所有子文件复制到System32文件夹下,就可以用cmd去访问了。命令是nc -l -p [port],我这里监听的是12345端口。注意程序的启动顺序,打开cmd的窗口,输入命令,再去启动你的flink程序。整个过程不需要任何Flink环境(不用下载Flink压缩包)。