flink scala编写wordcount
程序员文章站
2022-06-17 10:20:41
...
开发工具:idea+netcat
1、使用idea新建项目
step1:
新建maven项目
step2:
指定项目基本信息
step3:
pom文件增加flink与scala依赖项
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
2、批处理模式:
hello.txt
Hello World
Hello Flink
WordCountBatch.scala
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
object WordCountBatch {
def main(args: Array[String]): Unit = {
//获取resources下的文件
val resource = this.getClass.getClassLoader.getResource("hello.txt")
//获取执行环境
var env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//读取文件、分割、过滤、map、分组求和
var grp = env.readTextFile(resource.getPath).flatMap(_.split(" ")).filter(_.nonEmpty).map((_, 1))
.groupBy(0).sum(1)
//打印处理结果
grp.print()
//env.execute()
}
}
执行结果:
3、流式处理模式:
WordCountStream.scala
package com.emoney.learn
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object WordCountStream {
def main(args: Array[String]): Unit = {
//获取流处理执行环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//监听host,port socket端口
val ds = environment.socketTextStream("localhost", 6666)
//ETL
ds.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).print()
environment.execute()
}
}
netcat开启socke端口
step1:
https://eternallybored.org/misc/netcat/下载可执行问题
step2:
解压文件并拷贝nc.exe到C:\Windows\System32
step3:
开启socket端口
win+r nc -l -p 6666
运行结果
推荐阅读
-
Flink实例-Wordcount详细步骤
-
IDEA 编写第一个 Scala 程序
-
Flink WordCount测试
-
python、scala、java分别实现在spark上实现WordCount
-
Flink安装及WordCount实例yarn-cluster提交
-
用java和scala实现一个简单Flink的分布式缓存(Distributed Cache)
-
使用maven命令 创建基于Scala的flink项目
-
4 Flink1.10.1对wordcount进行批处理和流处理
-
indows Eclipse Scala编写WordCount程序
-
Flink的WordCount实现(Java和Scala)