idea使用maven快速构建flink骨架项目
Scala版Flink程序编写
本项目使用的Flink版本为最新版本,也就是1.11.2。现在提供maven项目的配置文件。
使用Intellij IDEA创建一个Maven新项目
勾选Create from archetype
,然后点击Add Archetype
按钮
GroupId中输入org.apache.flink
,ArtifactId中输入flink-quickstart-scala
,Version中输入1.11.2,然后点击OK
点击向右箭头,出现下拉列表,选中flink-quickstart-scala:1.11.2
(同理构建java项目就是将scala改成java),点击Next
Name中输入FlinkTest
,GroupId中输入com.zhengkw
,ArtifactId中输入FlinkTest,点击Next
最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击Finish,等待一会儿,项目就创建好了
根据情况修改pom文件,因为我这边用的是scala2.12.12,所以我将pom做了一些修改
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>
编写WordCount.scala程序
package com.imprexion
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* @ClassName:WorldCount
* @author: zhengkw
* @description:
* @date: 20/10/22上午 11:13
* @version:1.0
* @since: jdk 1.8 scala 2.11.8
*/
object WorldCount {
def main(args: Array[String]): Unit = {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("E:\\IdeaWorkspace\\Flink-DataMoveForHive2Kafka\\src\\main\\resources\\helloworld.txt")
val value = text.flatMap { r => r.split("\\s") }
.map(w => WordWithCount(w, 1))
// 使用word字段进行分组操作,也就是shuffle
.keyBy(0)
// 做聚合操作,类似与reduce
.sum(1).print()
env.execute()
}
case class WordWithCount(word: String, count: Int)
}
过程中遇到的问题
【Flink scala】No implicits found for parameter evidence$12
官网说明
1:A frequent reason if that the code that generates the
TypeInformation has not been imported. Make sure to import the entire
flink.api.scala package.2:Another common cause are generic methods, which can be fixed as
described in the following section.
原因:当前环境之下找到不到scala的包
添加:
import org.apache.flink.api.scala._
即可