【09】Flink 之 DataSet API(三):DataSet Sink 操作
程序员文章站
2022-07-14 13:49:18
...
1、DataSet Sink 数据输出
在Data Source部分和其他部分使用过写文件和打印操作,代码相同,只对理论进行介绍
1.1、分类
- writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
- writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
- print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
2、数据输出详解
通过对批量数据的读写(DataSource)及转换(Transformation)操作,最终形成用户期望的结果数据集,然后需要将数据写入不同的外部介质中进行存储,进而完成整改批量数据处理过程。Flink中对应数据输出功能被称为DataSinks操作,和DataSource Operator操作类似,为了能够让用户更加灵活地使用外部数据,Flink抽象出通用的OutputFormat接口,批量数据输出全部实现与OutputFormat接口。Flink内置了常用数据存储介质对应的OutputFormat,如HadoopOutputFormat、JDBCOutputFormat等。同样,用户也可以自定义实现OutputFormat接口。
Flink在DataSet API中的数据输出共分为三种类型:
- 基于文件实现,对应DataSet的write( )方法;
- 基于通用存储介质实现,对应DataSet的output( )方法,如JDBCOutputFormat;
- 客户端输出,直接将DataSet数据从不同的节点收集到Client,如print( )方法
其中,第一种和第三种在之前程序中进行了使用,对第二种进行简单介绍。
2.1、通过输出接口
可以使用自定义OutputFormat方法来定义介质对应的OutputFormat。
// 读取数据集并转换为(word , count) 类型数据
val dataSet:DataSet[{String, Long}] = ...
// 将数据集的格式转换成[Text, LongWritable] 类型
val words = dataset.map( ... )
// 定义HadoopOutputFormat
val hadoopOutputFormat = new HadoopOutputFormat[ Text, LongWritable ](
new TextOutputFormat[ Text, LongWritable ] ,
new JobConf
)
// 指定输出路径
FileOutputFormat.setOutputPath( hadoopOutputFormat .getJobConf , new Path(resultPath) )
// 调用 Output 方法将数据写入Hadoop文件系统
words.output(hadoopOutputFormat )