Flink入门之WordCount(Scala语言)
程序员文章站
2022-06-17 10:20:47
...
Flink入门之WordCount(Scala语言)
流式处理
一.创建一个Maven工程
1.pom.xml文件依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
2.加入scala依赖
右击工程FlinkDemo–>Open Module Settings -->点击“+”号添加
3.在scala包下创建包为com.xxxx.xxxx,在包下创建scala工程FlinkDemo选为Object
4.代码如下
package com.smxy.flinkdemo
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkDemo {
def main(args: Array[String]): Unit = {
// 获取socket端口号
val port: Int = try{
ParameterTool.fromArgs(args).getInt("port")
}catch {
case e => {
System.err.println("no port set. use default 9000")
}
9000
}
// 获取运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 连接socket获取输入数据,IP改为自己的虚拟机IP
val text = env.socketTextStream("192.168.1.15",port,'\n')
// 解析数据(把数据打平),分组,窗口计算,聚合
val windowCounts = text.flatMap(line => line.split("\\s"))//把一行单词切分
.map(w => WordwithCount(w,1))// 把单词转成(a,1)的形式
.keyBy("word") //分组
.timeWindow(Time.seconds(2),Time.seconds(1)) //指定窗口大小,指定时间间隔
.sum("count") //统计
windowCounts.print().setParallelism(1) //打印到控制台
// 执行任务
env.execute("Socket window count")
}
case class WordwithCount(word: String,count: Long)
}
5.若运行程序报如下错误
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
#将上述导入依赖修改如下
import org.apache.flink.streaming.api.scala._
二.虚拟机开启端口测试
1.虚拟机安装nc
yum install nc -y