4 Flink1.10.1对wordcount进行批处理和流处理
程序员文章站
2022-07-05 16:37:04
...
1 Flink使用1.10.1版本对wordcount进行批处理
1.1 pom.xml配置
包名:com.hik.myFlink.Flink74
<groupId>com.hik.myFlink</groupId>
<artifactId>Flink74</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Flink74</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.10.1</flink.version>
</properties>
<dependencies>
<!-- Flink 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.hik.myFlink.FlinkStreamTest</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2 类名 FlinkBatchTest 用来进行批处理
package com.hik.myFlink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
/**
* @Auther lyz
* @Date 2020/7/15
*/
/**
* Flink 可以处理批数据,也可以处理流式数据
* Flink & SparkStreaming & Storm
* 1.Storm 吞吐量低,实时性强
* 2.SparkStreaming 微批处理,吞吐量大,实时性低 5s
* 3.Flink 兼容了Strom和SparkStreaming的优点,吞吐量大,实时性强。
*
* Flink中分为有界数据和*数据:
* 有界数据:批数据
* *数据:流数据
*
* =======================
*
* 如果是处理批数据,批数据底层是DataSet,必须创建:ExecutionEnvironment.getExecutionEnvironment();
* 如果是处理流数据,流式处理底层是DataStream,必须创建:StreamExecutionEnvironment.getExecutionEnvironment();
* 代码流程:
* 1.创建ExecutionEnvironment();
* 2.Source -> transformation -> Sink
* 3.必须有env.execute()去触发执行transformation,批次处理中 【count()', 'collect()', or 'print()'.】
*
* Flink 中没有全局排序,只能在每个分区排序。
*/
public class FlinkBatchTest {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = env.readTextFile("./data/input");
FlatMapOperator<String, String> flatMap = dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] split = line.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
MapOperator<String, Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> groupBy = map.groupBy(0);
DataSet<Tuple2<String, Integer>> sum = groupBy.sum(1).setParallelism(1);
SortPartitionOperator<Tuple2<String, Integer>> sum1= sum.sortPartition(1, Order.DESCENDING);
DataSink<Tuple2<String, Integer>> tuple2DataSink = sum1.writeAsText("./data/result", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("myflink");
}
}
3 结果显示 input 输入如下:
4 result 结果显示如下:
2 Flink使用1.10.1版本对wordcount进行流处理
2.1 IDEA中 pom.xml如上述所示
2.2 服务器中部署flink集群,我采用的版本为flink-1.10.1-bin-scala_2.11.tgz
2.3类名 FlinkStreamTest
package com.hik.myFlink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Auther lyz
* @Date 2020/7/8
*/
public class FlinkStreamTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("Hlink161", 9999);
SingleOutputStreamOperator<String> platMap = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] split = line.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = platMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(String word) throws Exception {
return new Tuple3<>(word,word,1);
}
});
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyBy = map.keyBy(0);
DataStream <Tuple3<String, String, Integer>> sum = keyBy.sum(2).setParallelism(1);
//sum.writeAsText("./data/result1", FileSystem.WriteMode.OVERWRITE) ;
sum.print();
env.execute("my first flink strem");
}
}
3 在服务器中打开一个端口 ,实时输入数据
nc -lk 9999
4 控制台 结果显示如下: