欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

idea使用maven快速构建flink骨架项目

程序员文章站 2022-07-14 13:39:44
...

Scala版Flink程序编写

本项目使用的Flink版本为最新版本,也就是1.11.2。现在提供maven项目的配置文件。

使用Intellij IDEA创建一个Maven新项目
勾选Create from archetype,然后点击Add Archetype按钮
GroupId中输入org.apache.flink,ArtifactId中输入flink-quickstart-scala,Version中输入1.11.2,然后点击OK
点击向右箭头,出现下拉列表,选中flink-quickstart-scala:1.11.2(同理构建java项目就是将scala改成java),点击Next
Name中输入FlinkTest,GroupId中输入com.zhengkw,ArtifactId中输入FlinkTest,点击Next
最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击Finish,等待一会儿,项目就创建好了
根据情况修改pom文件,因为我这边用的是scala2.12.12,所以我将pom做了一些修改

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.11.2</flink.version>
		<scala.binary.version>2.12</scala.binary.version>
		<scala.version>2.12.12</scala.version>
		<log4j.version>2.12.1</log4j.version>
	</properties>

编写WordCount.scala程序

package com.imprexion

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
 * @ClassName:WorldCount
 * @author: zhengkw
 * @description:
 * @date: 20/10/22上午 11:13
 * @version:1.0
 * @since: jdk 1.8 scala 2.11.8
 */
object WorldCount {
  def main(args: Array[String]): Unit = {

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val text: DataStream[String] = env.readTextFile("E:\\IdeaWorkspace\\Flink-DataMoveForHive2Kafka\\src\\main\\resources\\helloworld.txt")

    val value = text.flatMap { r => r.split("\\s") }
      .map(w => WordWithCount(w, 1))
      // 使用word字段进行分组操作,也就是shuffle
      .keyBy(0)
      // 做聚合操作,类似与reduce
      .sum(1).print()
    env.execute()
  }

  case class WordWithCount(word: String, count: Int)

}

过程中遇到的问题

【Flink scala】No implicits found for parameter evidence$12
官网说明

1:A frequent reason if that the code that generates the
TypeInformation has not been imported. Make sure to import the entire
flink.api.scala package.

2:Another common cause are generic methods, which can be fixed as
described in the following section.

原因:当前环境之下找到不到scala的包

添加:

import org.apache.flink.api.scala._
即可

相关标签: Flink Scala