欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【09】Flink 之 DataSet API(三):DataSet Sink 操作

程序员文章站 2022-07-14 13:49:18
...

1、DataSet Sink 数据输出

在Data Source部分和其他部分使用过写文件和打印操作,代码相同,只对理论进行介绍

1.1、分类

  1. writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
  2. writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法
  3. print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

2、数据输出详解

  通过对批量数据的读写(DataSource)及转换(Transformation)操作,最终形成用户期望的结果数据集,然后需要将数据写入不同的外部介质中进行存储,进而完成整改批量数据处理过程。Flink中对应数据输出功能被称为DataSinks操作,和DataSource Operator操作类似,为了能够让用户更加灵活地使用外部数据,Flink抽象出通用的OutputFormat接口,批量数据输出全部实现与OutputFormat接口。Flink内置了常用数据存储介质对应的OutputFormat,如HadoopOutputFormat、JDBCOutputFormat等。同样,用户也可以自定义实现OutputFormat接口。
Flink在DataSet API中的数据输出共分为三种类型:

  1. 基于文件实现,对应DataSet的write( )方法;
  2. 基于通用存储介质实现,对应DataSet的output( )方法,如JDBCOutputFormat;
  3. 客户端输出,直接将DataSet数据从不同的节点收集到Client,如print( )方法

  其中,第一种和第三种在之前程序中进行了使用,对第二种进行简单介绍。

2.1、通过输出接口

  可以使用自定义OutputFormat方法来定义介质对应的OutputFormat。

//	读取数据集并转换为(word , count) 类型数据
val dataSet:DataSet[{String,  Long}] = ...
//	将数据集的格式转换成[Text, LongWritable] 类型
val words = dataset.map( ... )
//	定义HadoopOutputFormat
val hadoopOutputFormat = new HadoopOutputFormat[ Text,  LongWritable ](
	new TextOutputFormat[ Text,  LongWritable ] ,
	new JobConf
)
//	指定输出路径
FileOutputFormat.setOutputPath( hadoopOutputFormat .getJobConf , new Path(resultPath) )
//	调用 Output 方法将数据写入Hadoop文件系统
words.output(hadoopOutputFormat  )

下一节:【10】Flink 支持的DataType和序列化