欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataSet API

程序员文章站 2022-07-14 13:49:24
...

Data Source

Data Source 创建初始数据集。Flink 附带了几种内置输入格式,可以从通用文件格式创建数据集。ExecutionEnvironment 上有创建的方法。

基于文件的:

  • readTextFile(path) / TextInputFormat,按行读取文件并将其作为字符串返回。

  • readTextFileWithValue(path) / TextValueInputFormat,按行读取文件并将其作为 StringValues 返回。StringValues是可变字符串。

  • readCsvFile(path) / CsvInputFormat,按行读取文件,解析逗号(或其他字符)分隔,并返回元组或对象的数据集。

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat,解析基本数据类型(字符串或整数)的文件,以换行符(或其他字符)分隔。

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat,从指定路径读取解析 SequenceFile,并返回 <key, value> 元组。

基于集合:

  • fromCollection(Seq),用对象序列创建数据集。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator),用迭代器创建数据集。指定迭代器返回的元素的数据类型。

  • fromElements(elements: _*),根据给定的对象序列创建数据集。所有对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator),并行地从迭代器创建数据集。指定迭代器返回的元素的数据类型。

  • generateSequence(from, to),并行生成给定间隔的数字序列。

通用:

  • readFile(inputFormat, path) / FileInputFormat,接受文件输入格式。

  • createInput(inputFormat) / InputFormat,接受通用输入格式。

示例代码:

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file")

读压缩文件

Flink 目前支持输入文件的透明解压缩,如果文件标有适当的文件扩展名。这意味着不需要进一步配置输入格式,并且任何 FileInputFormat 支持压缩,包括自定义输入格式。压缩文件可能无法并行读取,从而影响作业可伸缩性。当前支持的压缩方法:

压缩方法 文件扩展名 可并行
DEFLATE .deflate no / not
GZip .gz,.gzip no / not
Bzip2 .bz2 no / not
XZ .xz no / not

Transformation

DataSet 的基础算子与 DataStream 算子大致相同,可以互作参考:

DataSet

Map
一个数据元生成一个新的数据元

data.map { x => x.toInt }

FlatMap
一个数据元生成多个数据元(可以为0)

data.flatMap { str => str.split(" ") }

MapPartition
函数处理包含一个分区所有数据的“迭代器”,可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和先前的算子操作。

data.mapPartition { in => in map { (_, 1) } }

Filter
每个数据元执行布尔函数,只保存函数返回 true 的数据元。

data.filter { _ > 1000 }

Distinct
对数据集中的元素除重并返回新的数据集。

val input: DataSet[(Int, String, Double)] = // [...]
val output = input.distinct()

// Distinct with field position keys
val input: DataSet[(Int, Double, String)] = // [...]
val output = input.distinct(0,2)

// Distinct with KeySelector function
val input: DataSet[Int] = // [...]
val output = input.distinct {x => Math.abs(x)}

// Distinct with key expression
case class CustomType(aName : String, aNumber : Int) { }

val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber")

Reduce
作用于整个 DataSet,合并该数据集的元素。

val intNumbers = env.fromElements(1,2,3)
// 输出 6
val sum = intNumbers.reduce (_ + _)

Aggregate
对一组数据求聚合值,聚合可以应用于完整数据集或分组数据集。聚合转换只能应用于元组(Tuple)数据集,并且仅支持字段位置键进行分组。有一些常用的聚合算子,提供以下内置聚合函数(Aggregations):

  • Sum
  • Min
  • Max
val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
)

val output: DataSet[(Int, String, Double)] = input.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 2)

// 输出 (7,c,50.0)


// 简化语法
val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)

MinBy / MaxBy
取元组数据集中指定一个或多个字段的值最小(最大)的元组,可以应用于完整数据集或分组数据集。用于比较的字段必须可比较的。如果多个元组具有最小(最大)字段值,则返回这些元组的任意元组。

val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)

// 比较元组的第一个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0)
// 输出 (1,b,20.0)


// 比较元组的第一、三个字段
val output: DataSet[(Int, String, Double)] = input.minBy(0,2)
// 输出 (1,a,10d)

Grouped DataSet

Grouped DataSet 方法 groupBy() 用来将数据分组,有多种数据分组方法:

  • 对于 Pojo 类型,可以根据 KeyExpression 或 KeySelector 分区
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  }
  // [...]
}

val words: DataSet[WC] = // [...]

// Grouped by Key Expression
val wordCounts1 = words.groupBy("word")

// Grouped by KeySelector Function
val wordCounts2 = words.groupBy { _.word } 
  • 对于元组(Tuple)类型,可以根据字段位置分组
val tuples = DataSet[(String, Int, Double)] = // [...]

// 根据元组的第一和第二个字段分组
val reducedTuples = tuples.groupBy(0, 1)
  • 分组并排序:
val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).sortGroup(1, Order.ASCENDING)

Reduce
作用于整个分组元素,合并该组的所有元素。

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)
}

ReduceGroup
通过将此数据集中的所有元素传递给函数,创建一个新的数据集。该函数可以使用收集器输出零个或多个元素。也可以作用与完整数据集,迭代器会返回完整数据集的元素。

val input: DataSet[(Int, String)] = env.fromElements((1, "a"), (1, "b"), (2, "a"), (3, "c"))
val output = input.groupBy(0).reduceGroup {
    (in, out: Collector[(Int, String)]) =>
        out.collect(in.reduce((x, y) => (x._1, x._2 + y._2)))
}

// 输出 (3,c),(1,ab),(2,a)

Aggregate
聚合可以应用于分组数据集。

val input: DataSet[(Int, String, Double)] = env.fromElements(
      (1, "a", 10d), (1, "b", 20d), (2, "a", 30d), (3, "c", 50d)
    )

val output: DataSet[(Int, String, Double)] = input.groupBy(1).sum(0).max(2)

// 输出 (3,a,50.0)

MinBy / MaxBy
可以应用于分组数据集

 val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(Int, String, Double)] = env.fromElements(
  (1, "b", 20d), (1, "a", 10d), (2, "a", 30d)
)

// 按第二个字段分组,取第一、三个字段最小的元组
val output: DataSet[(Int, String, Double)] = input.groupBy(1)
  .minBy(0, 2)

// 输出 (1,a,10.0),(1,b,20.0)

Join

将两个 DataSet 连接生成一个新的 DataSet。

Join
两个数据集指定的要连接的 key,进行 join,默认是一个 inner join。可以使用 JoinFunction 将该组连接元素转化为单个元素,也可以使用 FlatJoinFunction 将该组元素转化为任意多个元素(包括 none)。

// where("0") 表示使用input1的第一个字段连接
// equalTo("1") 表示使用input2的第二个字段,判断等于input1的第一个字段的值
val result = input1.join(input2).where(0).equalTo(1)

可以通过 JoinHint 参数来指定运行时执行连接的方式。参数描述了 join 是通过分区(partitioning)还是广播(broadcasting)发生的,以及使用算法是基于排序(sort-based)的还是基于哈希(hash-based)的。如果没有指定 JoinHint,系统将尝试对输入大小进行评估,并根据这些评估选择最佳策略。

类似 Spark SQL 的 join 逻辑,会根据要连接的两个数据的大小,进行优化。如果不是非常了解那种连接方式在什么场景下更优,建议由系统选择,不要指定。

// 广播input1,并使用 hash table 的方式
val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
  .where(0).equalTo(1)
  
// JoinHint 可选项:
// OPTIMIZER_CHOOSES,由系统判断选择
// BROADCAST_HASH_FIRST,第一个数据集构建哈希表并广播,由第二个表扫描。适用于第一个数据集较小的情况
// BROADCAST_HASH_SECOND,适用于第二个数据集较小的情况
// REPARTITION_HASH_FIRST,对两个数据同时进行分区,并从第一个输入构建哈希表。如果第一个输入小于第二个输入,则此策略很好。 
// REPARTITION_HASH_SECOND,适用于第二个输入小于第一个输入。
// REPARTITION_SORT_MERGE,对两个数据同时进行分区,并对每个输入进行排序(除非数据已经分区或排序)。输入通过已排序输入的流合并来连接。如果已经对一个或两个输入进行过分区排序的情况,则此策略很好。

// REPARTITION_HASH_FIRST 是系统使用的默认回退策略,如果不能进行大小估计,并且不能重新使用预先存在的分区和排序顺序。

为了引导优化器选择正确的执行策略,可以提示要关联的 DataSet 的大小:

val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]

// 表示第二个数据集 input2 特别小
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)

// 表示第二个数据集 input2 特别大
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)

对连接成功的数据有两种处理方式(类似 map 和 flatMap),以下面两个数据集连接为例:

case class Rating(name: String, category: String, points: Int)
    
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
  • Join with Join Function,接收第一个数据集的一个数据元和第二个数据集的一个数据元,并返回一个数据元
    val weightedRatings = ratings
        .join(weights)
        .where("category")
        .equalTo(0) {
            (rating, weight) => (rating.name, rating.points * weight._2)
        }
    
  • Join with Flat-Join Function,返回零个、一个或多个数据元
    val weightedRatings = ratings
        .join(weights)
        .where("category")
        .equalTo(0) {
            (rating, weight, out: Collector[(String, Double)]) =>
                if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
        }
    

⚠️ Join 仅适用于等于连接的情况,其他连接类型需要使用 OuterJoin 或 CoGroup。

OuterJoin
多两个数据集执行左连接(leftOuterJoin)、右连接(rightOuterJoin)或全外连接(fullOuterJoin)。与 Join(inner join)的区别在于,如果在另一侧没有找到匹配的数据,则保存左侧(或右侧、两侧)的记录。

input1.leftOuterJoin(input2)
      .where(0)              
      .equalTo(1)            
      .with(new JoinFunction<String, String, String>() {
          public String join(String v1, String v2) {
             // NOTE:
             // - v2 might be null for leftOuterJoin
             // - v1 might be null for rightOuterJoin
             // - v1 OR v2 might be null for fullOuterJoin
          }
      });

CoGroup
Reduce 操作的二维变体。对一个或多个字段中的每个输入进行分组,然后加入组。每对组调用转换函数。

val iVals: DataSet[(String, Int)] = env.fromElements(("a", 10), ("b", 20), ("a", 30))
val dVals: DataSet[(String, Double)] = env.fromElements(("a", 1.0), ("b", 2.0), ("c", 3.0))

// iVals 第一个字段与 dVals 第一个字段连接
val output: DataSet[Double] = iVals.coGroup(dVals).where(0).equalTo(0) {
  (iVals, dVals, out: Collector[Double]) =>
    // iVals [("a",10),("a",30)]
    val ints = iVals map {
      _._2
    } toSet

    // dVals [("a", 1.0)]
    for (dVal <- dVals) {
      for (i <- ints) {
        out.collect(dVal._2 * i)
      }
    }
}

// 输出 10.0,30.0,40.0

Union
构建两个数据集的并集。

data.union(data2)

Cross
构建两个输入数据集的笛卡尔积。可选择使用 CrossFunction 将元素对转换为单个元素。

val data1: DataSet[Int] = // [...]
val data2: DataSet[String] = // [...]
val result: DataSet[(Int, String)] = data1.cross(data2)

Cross 是一个计算密集型操作,对大型数据集会带来挑战。建议使用 crossWithTiny()crossWithHuge() 优化。

Repartition

Rebalance
均匀地重新负载数据集的并行分区以消除数据偏差。后面只可以接类似 map 的算子操作。

val in: DataSet[String] = // [...]
val out = in.rebalance().map { ... }

Hash-Partition
根据给定的 key 对数据集做 hash 分区。可以是 position keys,expression keys 或者 key selector functions。

val in: DataSet[(String, Int)] = // [...]
val out = in.partitionByHash(0).mapPartition { ... }

Range-Partition
根据给定的 key 对一个数据集进行 Range 分区。可以是 position keys,expression keys 或者 key selector functions。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionByRange(0).mapPartition { ... }

Custom Partitioning
手动指定数据分区。此方法仅适用于单个字段的 key。

val in: DataSet[(Int, String)] = // [...]
val result = in.partitionCustom(partitioner: Partitioner[K], key)

其他

Sort Partition
本地以指定的顺序在指定的字段上对数据集的所有分区进行排序。可以指定 field position 或 filed expression。

val in: DataSet[(Int, String)] = // [...]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }

First-n
返回数据集的前n个元素。可以应用于任意数据集。

val in: DataSet[(Int, String)] = // [...]
// DataSet
val result1 = in.first(3)
// Grouped DataSet
val result2 = in.groupBy(0).first(3)
// Grouped-sorted DataSet
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

Project
Java API 支持,Scala API 不支持,作用于元组的转换,从元组中选择字段的子集。

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);

Data Sink

Data Sink 从 DataSet 中取出数据保存或者返回。Flink 各种内置的输出格式,在 DataSet 上的算子操作后面调用:

  • writeAsText() / TextOutputFormat,将元素以字符串形式写入文件。字符串通过调用每个元素的 toString() 方法获得。

  • writeAsCsv(...) / CsvOutputFormat,将元组字段以逗号分隔写入文件。行和字段分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr(),在标准输出/标准错误输出中打印每个元素的 toString() 返回值。

  • write() / FileOutputFormat,自定义文件输出的方法和基类。支持自定义对象到字节转换。

  • output() / OutputFormat,通用输出方法,用于非基于文件的数据接收器。

示例代码:

// text data
val textData: DataSet[String] = // [...]

// write DataSet to a file on the local file system
textData.writeAsText("file:///my/result/on/localFS")

// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")

// write DataSet to a file and overwrite the file if it exists
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)

// tuples as lines with pipe as the separator "a|b|c"
val values: DataSet[(String, Int, Double)] = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")

// this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.writeAsText("file:///path/to/the/result/file")

// this writes values as strings using a user-defined formatting
values map { tuple => tuple._1 + " - " + tuple._2 }
  .writeAsText("file:///path/to/the/result/file")

本地排序输出

可以使用元组字段位置(field position)或字段表达式(field expression)在指定字段上对数据接收器的输出进行本地排序。这适用于每种输出格式。

尚不支持全局排序的输出。

val tData: DataSet[(Int, String, Double)] = // [...]
val pData: DataSet[(BookPojo, Double)] = // [...]
val sData: DataSet[String] = // [...]

// sort output on String field in ascending order
tData.sortPartition(1, Order.ASCENDING).print()

// sort output on Double field in descending and Int field in ascending order
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print()

// sort output on the "author" field of nested BookPojo in descending order
pData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...)

// sort output on the full tuple in ascending order
tData.sortPartition("_", Order.ASCENDING).writeAsCsv(...)

// sort atomic type (String) output in descending order
sData.sortPartition("_", Order.DESCENDING).writeAsText(...)

Iteration

迭代在 Flink 程序中实现循环。迭代运算符封装了程序的一部分并重复执行,将一次迭代的结果(部分结果)反馈到下一次迭代中。Flink 两种迭代类型:==BulkIteration== 和 ==DeltaIteration==。

批量迭代(Bulk Iteration)

调用 DataSet 的 iterate(int) 方法创建一个 BulkIteration,迭代以此为起点,返回一个 IterativeDataSet,可以用常规运算符进行转换。迭代调用的参数 int 指定最大迭代次数。

IterativeDataSet 调用 closeWith(DataSet) 方法来指定哪个转换应该反馈到下一个迭代,可以选择使用 closeWith(DataSet,DataSet) 指定终止条件。如果该 DataSet 为空,则它将评估第二个 DataSet 并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。

以下示例迭代地估计数量Pi。目标是计算落入单位圆的随机点数。在每次迭代中,挑选一个随机点。如果此点位于单位圆内,增加计数。然后估计 Pi 作为结果计数除以迭代次数乘以4。

val env = ExecutionEnvironment.getExecutionEnvironment()

val initial = env.fromElements(0)

val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}

val result = count map { c => c / 10000.0 * 4 }

result.print()

env.execute("Iterative Pi Example")

可以查看 K-Means示例,该示例使用 BulkIteration 来聚类一组未标记的点。

增量迭代(Delta Iteration)

DeltaIteration 利用了某些算法在每次迭代中不会更改解的每个数据点的特点。

除了在每次迭代中反馈的部分解决方案之外,还在迭代中维护状态,可以通过增量更新。迭代计算的结果是最后一次迭代之后的状态。参考 迭代的基本原理

定义 DeltaIteration 类似于定义 BulkIteration。两个数据集构成每次迭代的输入(工作集和解集),并且在每次迭代中生成两个数据集作为结果(新工作集,增量解集)。

调用初始解决方案集的 iterateDelta(initialWorkset, maxIterations, key) 方法创建一个 DeltaIteration:

val initialSolutionSet: DataSet[(Long, Double)] = // [...]

val initialWorkset: DataSet[(Long, Double)] = // [...]

val maxIterations = 100
val keyPosition = 0

val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}

result.writeAsCsv(outputPath)

env.execute()

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/

上一篇: JDK1.7新特性

下一篇: 集成方法