Flink 入门实战之三Source读取文件
程序员文章站
2022-03-14 18:15:14
...
Flink从文件中读取用户数据,并转化成样例类
- 用户数据user.csv
1,男,张三,20,1605970941
2,女,莉莉,30,1605970922
3,女,红红,30,1605970913
4,男,李四,24,1605970904
5,男,王五,25,1605970965
6,男,小明,20,1605970946
- 源码
package com.day
import org.apache.flink.streaming.api.scala._
//用户信息样例类
case class User(id: String, name: String, sex:String, age:Int, ts:Long)
//source之读取文件
object SourceFile {
def main(args: Array[String]): Unit = {
// 1、获取流式环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2、从文件中读取数据
val filePath = "src/main/resources/user.csv"
val dataStream = env.readTextFile(filePath)
.map(line => {
val arr = line.split(",")
User(arr(0), arr(1), arr(2), arr(3).toInt, arr(4).toLong)
})
// 3、打印数据
dataStream.print()
// 4、执行任务
env.execute("SourceFile Job")
}
}
- pom.xml
<!-- 属性配置 -->
<properties>
<flink.version>1.11.2</flink.version>
<scope>compile</scope>
</properties>
<dependencies>
<!-- flink 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>${scope}</scope>
</dependency>
</dependencies>
- 执行结果
上一篇: 数据库索引的知识点整理小结,你所需要了解的都在这儿了
下一篇: jquery怎么实现新消息提示