Flink DataSet API
Data Source
Data Source 创建初始数据集。Flink 附带了几种内置输入格式,可以从通用文件格式创建数据集。ExecutionEnvironment
上有创建的方法。
基于文件的:
readTextFile(path) / TextInputFormat
,按行读取文件并将其作为字符串返回。readTextFileWithValue(path) / TextValueInputFormat
,按行读取文件并将其作为 StringValues 返回。StringValues是可变字符串。readCsvFile(path) / CsvInputFormat
,按行读取文件,解析逗号(或其他字符)分隔,并返回元组或对象的数据集。readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat
,解析基本数据类型(字符串或整数)的文件,以换行符(或其他字符)分隔。readSequenceFile(Key, Value, path) / SequenceFileInputFormat
,从指定路径读取解析 SequenceFile,并返回 <key, value> 元组。
基于集合:
fromCollection(Seq)
,用对象序列创建数据集。集合中的所有元素必须属于同一类型。fromCollection(Iterator)
,用迭代器创建数据集。指定迭代器返回的元素的数据类型。fromElements(elements: _*)
,根据给定的对象序列创建数据集。所有对象必须属于同一类型。fromParallelCollection(SplittableIterator)
,并行地从迭代器创建数据集。指定迭代器返回的元素的数据类型。generateSequence(from, to)
,并行生成给定间隔的数字序列。
通用:
readFile(inputFormat, path) / FileInputFormat
,接受文件输入格式。createInput(inputFormat) / InputFormat
,接受通用输入格式。
示例代码:
val env = ExecutionEnvironment.getExecutionEnvironment
// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")
// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
"hdfs:///the/CSV/file",
pojoFields = Array("name", "age", "zipcode"))
// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")
// generate a number sequence
val numbers = env.generateSequence(1, 10000000)
// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file")
读压缩文件
Flink 目前支持输入文件的透明解压缩,如果文件标有适当的文件扩展名。这意味着不需要进一步配置输入格式,并且任何 FileInputFormat 支持压缩,包括自定义输入格式。压缩文件可能无法并行读取,从而影响作业可伸缩性。当前支持的压缩方法:
压缩方法 | 文件扩展名 | 可并行 |
---|---|---|
DEFLATE | .deflate | no / not |
GZip | .gz,.gzip | no / not |
Bzip2 | .bz2 | no / not |
XZ | .xz | no / not |
Transformation
DataSet 的基础算子与 DataStream 算子大致相同,可以互作参考:
DataSet
Map
一个数据元生成一个新的数据元
data.map { x => x.toInt }
FlatMap
一个数据元生成多个数据元(可以为0)
data.flatMap { str => str.split(" ") }
MapPartition
函数处理包含一个分区所有数据的“迭代器”,可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的算子操作。
data.mapPartition { in => in map { (_, 1) } }
Filter
每个数据元执行布尔函数,只保存函数返回 true 的数据元。
data.filter { _ > 1000 }
Distinct
对数据集中的元素除重并返回新的数据集。
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()
// Distinct with field position keys
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)
// Distinct with KeySelector function
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}
// Distinct with key expression
case class CustomType(aName : String, aNumber : Int) { }
val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")
Reduce
作用于整个 DataSet,合并该数据集的元素。
val intNumbers = env.fromElements(1,2,3)
// 输出 6
val sum = intNumbers.reduce (_ + _)
Aggregate
对一组数据求聚合值,聚合可以应用于完整数据集或分组数据集。聚合转换只能应用于元组(Tuple)数据集,并且仅支持字段位置键进行分组。有一些常用的聚合算子,提供以下内置聚合函数(Aggregations
):
- Sum
- Min
- Max
val input: DataSet[(Int, String, Double)] = env.fromElements(
(1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
)
val output: DataSet[(Int, String, Double)] = input.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 2)
// 输出 (7,c,50.0)
// 简化语法
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
MinBy / MaxBy
取元组数据集中指定一个或多个字段的值最小(最大)的元组,可以应用于完整数据集或分组数据集。用于比较的字段必须可比较的。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。
val input: DataSet[(Int, String, Double)] = env.fromElements(
(1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)
// 比较元组的第一个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0)
// 输出 (1,b,20.0)
// 比较元组的第一、三个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0,2)
// 输出 (1,a,10d)
Grouped DataSet
Grouped DataSet 方法 groupBy()
用来将数据分组,有多种数据分组方法:
- 对于 Pojo 类型,可以根据 KeyExpression 或 KeySelector 分区
class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
// [...]
}
val words: DataSet[WC] = // [...]
// Grouped by Key Expression
val wordCounts1 = words.groupBy("word")
// Grouped by KeySelector Function
val wordCounts2 = words.groupBy { _.word }
- 对于元组(Tuple)类型,可以根据字段位置分组
val tuples = DataSet[(String, Int, Double)] = // [...]
// 根据元组的第一和第二个字段分组
val reducedTuples = tuples.groupBy(0, 1)
- 分组并排序:
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING)
Reduce
作用于整个分组元素,合并该组的所有元素。
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
ReduceGroup
通过将此数据集中的所有元素传递给函数,创建一个新的数据集。该函数可以使用收集器输出零个或多个元素。也可以作用与完整数据集,迭代器会返回完整数据集的元素。
val input: DataSet[(Int, String)] = env.fromElements((1, "a"), (1, "b"), (2, "a"), (3, "c"))
val output = input.groupBy(0).reduceGroup {
(in, out: Collector[(Int, String)]) =>
out.collect(in.reduce((x, y) => (x._1, x._2 + y._2)))
}
// 输出 (3,c),(1,ab),(2,a)
Aggregate
聚合可以应用于分组数据集。
val input: DataSet[(Int, String, Double)] = env.fromElements(
(1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
)
val output: DataSet[(Int, String, Double)] = input.groupBy(1).sum(0).max(2)
// 输出 (3,a,50.0)
MinBy / MaxBy
可以应用于分组数据集
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(Int, String, Double)] = env.fromElements(
(1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)
// 按第二个字段分组,取第一、三个字段最小的元组
val output: DataSet[(Int, String, Double)] = input.groupBy(1)
.minBy(0, 2)
// 输出 (1,a,10.0),(1,b,20.0)
Join
将两个 DataSet 连接生成一个新的 DataSet。
Join
两个数据集指定的要连接的 key,进行 join,默认是一个 inner join。可以使用 JoinFunction
将该组连接元素转化为单个元素,也可以使用 FlatJoinFunction
将该组元素转化为任意多个元素(包括 none)。
// where("0") 表示使用input1的第一个字段连接
// equalTo("1") 表示使用input2的第二个字段,判断等于input1的第一个字段的值
val result = input1.join(input2).where(0).equalTo(1)
可以通过 JoinHint
参数来指定运行时执行连接的方式。参数描述了 join 是通过分区(partitioning)还是广播(broadcasting)发生的,以及使用算法是基于排序(sort-based)的还是基于哈希(hash-based)的。如果没有指定 JoinHint
,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。
类似 Spark SQL 的 join 逻辑,会根据要连接的两个数据的大小,进行优化。如果不是非常了解那种连接方式在什么场景下更优,建议由系统选择,不要指定。
// 广播input1,并使用 hash table 的方式
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where(0).equalTo(1)
// JoinHint 可选项:
// OPTIMIZER_CHOOSES,由系统判断选择
// BROADCAST_HASH_FIRST,第一个数据集构建哈希表并广播,由第二个表扫描。适用于第一个数据集较小的情况
// BROADCAST_HASH_SECOND,适用于第二个数据集较小的情况
// REPARTITION_HASH_FIRST,对两个数据同时进行分区,并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好。
// REPARTITION_HASH_SECOND,适用于第二个输入小于第一个输入。
// REPARTITION_SORT_MERGE,对两个数据同时进行分区,并对每个输入进行排序(除非数据已经分区或排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行过分区排序的情况,则此策略很好。
// REPARTITION_HASH_FIRST 是系统使用的默认回退策略,如果不能进行大小估计,并且不能重新使用预先存在的分区和排序顺序。
为了引导优化器选择正确的执行策略,可以提示要关联的 DataSet 的大小:
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// 表示第二个数据集 input2 特别小
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
// 表示第二个数据集 input2 特别大
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
对连接成功的数据有两种处理方式(类似 map 和 flatMap),以下面两个数据集连接为例:
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
- Join with Join Function,接收第一个数据集的一个数据元和第二个数据集的一个数据元,并返回一个数据元
val weightedRatings = ratings .join(weights) .where("category") .equalTo(0) { (rating, weight) => (rating.name, rating.points * weight._2) }
- Join with Flat-Join Function,返回零个、一个或多个数据元
val weightedRatings = ratings .join(weights) .where("category") .equalTo(0) { (rating, weight, out: Collector[(String, Double)]) => if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2) }
⚠️ Join 仅适用于等于连接的情况,其他连接类型需要使用 OuterJoin 或 CoGroup。
OuterJoin
多两个数据集执行左连接(leftOuterJoin)、右连接(rightOuterJoin)或全外连接(fullOuterJoin)。与 Join(inner join)的区别在于,如果在另一侧没有找到匹配的数据,则保存左侧(或右侧、两侧)的记录。
input1.leftOuterJoin(input2)
.where(0)
.equalTo(1)
.with(new JoinFunction<String, String, String>() {
public String join(String v1, String v2) {
// NOTE:
// - v2 might be null for leftOuterJoin
// - v1 might be null for rightOuterJoin
// - v1 OR v2 might be null for fullOuterJoin
}
});
CoGroup
Reduce 操作的二维变体。对一个或多个字段中的每个输入进行分组,然后加入组。每对组调用转换函数。
val iVals: DataSet[(String, Int)] = env.fromElements(("a", 10), ("b", 20), ("a", 30))
val dVals: DataSet[(String, Double)] = env.fromElements(("a", 1.0), ("b", 2.0), ("c", 3.0))
// iVals 第一个字段与 dVals 第一个字段连接
val output: DataSet[Double] = iVals.coGroup(dVals).where(0).equalTo(0) {
(iVals, dVals, out: Collector[Double]) =>
// iVals [("a",10),("a",30)]
val ints = iVals map {
_._2
} toSet
// dVals [("a", 1.0)]
for (dVal <- dVals) {
for (i <- ints) {
out.collect(dVal._2 * i)
}
}
}
// 输出 10.0,30.0,40.0
Union
构建两个数据集的并集。
data.union(data2)
Cross
构建两个输入数据集的笛卡尔积。可选择使用 CrossFunction
将元素对转换为单个元素。
val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)
Cross 是一个计算密集型操作,对大型数据集会带来挑战。建议使用
crossWithTiny()
和crossWithHuge()
优化。
Repartition
Rebalance
均匀地重新负载数据集的并行分区以消除数据偏差。后面只可以接类似 map 的算子操作。
val in: DataSet[String] = // [...]
val out = in.rebalance().map { ... }
Hash-Partition
根据给定的 key 对数据集做 hash 分区。可以是 position keys,expression keys 或者 key selector functions。
val in: DataSet[(String, Int)] = // [...]
val out = in.partitionByHash(0).mapPartition { ... }
Range-Partition
根据给定的 key 对一个数据集进行 Range 分区。可以是 position keys,expression keys 或者 key selector functions。
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }
Custom Partitioning
手动指定数据分区。此方法仅适用于单个字段的 key。
val in: DataSet[(Int, String)] = // [...]
val result = in.partitionCustom(partitioner: Partitioner[K], key)
其他
Sort Partition
本地以指定的顺序在指定的字段上对数据集的所有分区进行排序。可以指定 field position 或 filed expression。
val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
First-n
返回数据集的前n个元素。可以应用于任意数据集。
val in: DataSet[(Int, String)] = // [...]
// DataSet
val result1 = in.first(3)
// Grouped DataSet
val result2 = in.groupBy(0).first(3)
// Grouped-sorted DataSet
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
Project
Java API 支持,Scala API 不支持,作用于元组的转换,从元组中选择字段的子集。
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);
Data Sink
Data Sink 从 DataSet 中取出数据保存或者返回。Flink 各种内置的输出格式,在 DataSet 上的算子操作后面调用:
writeAsText() / TextOutputFormat
,将元素以字符串形式写入文件。字符串通过调用每个元素的toString()
方法获得。writeAsCsv(...) / CsvOutputFormat
,将元组字段以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()
方法。print() / printToErr()
,在标准输出/标准错误输出中打印每个元素的toString()
返回值。write() / FileOutputFormat
,自定义文件输出的方法和基类。支持自定义对象到字节转换。output() / OutputFormat
,通用输出方法,用于非基于文件的数据接收器。
示例代码:
// text data
val textData: DataSet[String] = // [...]
// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")
// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")
// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)
// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")
// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
.writeAsText("file:///path/to/the/result/file")
本地排序输出
可以使用元组字段位置(field position)或字段表达式(field expression)在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。
尚不支持全局排序的输出。
val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]
// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()
// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()
// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)
// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)
// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)
Iteration
迭代在 Flink 程序中实现循环。迭代运算符封装了程序的一部分并重复执行,将一次迭代的结果(部分结果)反馈到下一次迭代中。Flink 两种迭代类型:==BulkIteration== 和 ==DeltaIteration==。
批量迭代(Bulk Iteration)
调用 DataSet 的 iterate(int)
方法创建一个 BulkIteration,迭代以此为起点,返回一个 IterativeDataSet,可以用常规运算符进行转换。迭代调用的参数 int 指定最大迭代次数。
IterativeDataSet 调用 closeWith(DataSet)
方法来指定哪个转换应该反馈到下一个迭代,可以选择使用 closeWith(DataSet,DataSet)
指定终止条件。如果该 DataSet 为空,则它将评估第二个 DataSet 并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。
以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以4。
val env = ExecutionEnvironment.getExecutionEnvironment()
val initial = env.fromElements(0)
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
val y = Math.random()
i + (if (x * x + y * y < 1) 1 else 0)
}
result
}
val result = count map { c => c / 10000.0 * 4 }
result.print()
env.execute("Iterative Pi Example")
可以查看 K-Means示例,该示例使用 BulkIteration 来聚类一组未标记的点。
增量迭代(Delta Iteration)
DeltaIteration 利用了某些算法在每次迭代中不会更改解的每个数据点的特点。
除了在每次迭代中反馈的部分解决方案之外,还在迭代中维护状态,可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。参考 迭代的基本原理。
定义 DeltaIteration 类似于定义 BulkIteration。两个数据集构成每次迭代的输入(工作集和解集),并且在每次迭代中生成两个数据集作为结果(新工作集,增量解集)。
调用初始解决方案集的 iterateDelta(initialWorkset, maxIterations, key)
方法创建一个 DeltaIteration:
val initialSolutionSet: DataSet[(Long, Double)] = // [...]
val initialWorkset: DataSet[(Long, Double)] = // [...]
val maxIterations = 100
val keyPosition = 0
val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
(solution, workset) =>
val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())
val nextWorkset = deltas.filter(new FilterByThreshold())
(deltas, nextWorkset)
}
result.writeAsCsv(outputPath)
env.execute()
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/