欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 批处理之DataSet

程序员文章站 2022-07-14 13:50:38
...

1.批处理流程

  1. 获取 Flink 批处理执行环境
  2. 构建 source
  3. 数据处理
  4. 构建 sink

3.Data Sources

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。 flink 在批处理中常见的 source 主要有两大类。

  1. 基于本地集合的 source(Collection-based-source)
  2. 基于文件的 source(File-based-source)

3.1基于本地集合的 source(Collection-based-source)

在 flink 最常见的创建 DataSet 方式有三种。

  • 使用 env.fromElements(),这种方式也支持 Tuple,自定义对象等复合形式。

  • 使用 env.fromCollection(),这种方式支持多种 Collection 的具体类型

  • 使用 env.generateSequence()方法创建基于 Sequence 的 DataSet
    package hctang.tech.bacth.Bacth

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object BatchFromCollection {
  def main(args: Array[String]): Unit = { //获取 flink 执行环境

val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

//0.用 element 创建 DataSet(fromElements)
val ds0: DataSet[String] = env.fromElements("spark", "flink")
ds0.print()

//1.用 Tuple 创建 DataSet(fromElements)
val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
ds1.print()

//2.用 Array 创建 DataSet
val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
ds2.print()

//3.用 ArrayBuffer 创建 DataSet
val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
ds3.print()

//4.用 List 创建 DataSet
val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
ds4.print()

//5.用 List 创建 DataSet
val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
ds5.print()

//6.用 Vector 创建 DataSet
val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
ds6.print()

//7.用 Queue 创建 DataSet
val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
ds7.print()

//8.用 Stack 创建 DataSet
val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
ds8.print()

//9.用 Stream 创建 DataSet(Stream 相当于 lazy List,避免在中间过程中生成不必要的集合)
val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
ds9.print()

//10.用 Seq 创建 DataSet
val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
ds10.print()

//11.用 Set 创建 DataSet
val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
ds11.print()

//12.用 Iterable 创建 DataSet
val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
ds12.print()

//13.用 ArraySeq 创建 DataSet
val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
ds13.print()

//14.用 ArrayStack 创建 DataSet
val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
ds14.print()

//15.用 Map 创建 DataSet
val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
ds15.print()

//16.用 Range 创建 DataSet
val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
ds16.print()

//17.用 fromElements 创建 DataSet
val ds17: DataSet[Long] = env.generateSequence(1, 9)
ds17.print()
}

}

3.2基于文件的 source(File-based-source)

3.2.1 读取本地文件

 package hctang.tech.bacth.Bacth

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object BatchFromFile {
  def main(args:Array[String]):Unit={
    //使用readFile 读取本地文件
    val environment:ExecutionEnvironment=ExecutionEnvironment.getExecutionEnvironment
    val data:DataSet[String]=environment.readTextFile("data/data.txt")
    //导入隐式转换
        import org.apache.flink.api.scala._
      //指定数据的转换
    val flatmap_data:DataSet[String]=data.flatMap(Line=>Line.split("\\W+"))
    val tuple_data:DataSet[(String,Int)]= flatmap_data.map(line=>(line,1))
    tuple_data.print()
    val groupData:GroupedDataSet[(String,Int)]=tuple_data.groupBy(line => line._1)
    val result:DataSet[(String,Int)]=groupData.reduce((x,y)=>(x._1,x._2+y._2))//统计相同键下的数量
    //触发程序执行
    result.print()

}
}

3.2.2 读取HDFS文件

package hctang.tech.bacth.Bacth

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object BatchfromHDFSFile {
def main(args: Array[String]): Unit = { //使用 readTextFile 读取本地文件

//初始化环境

val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//加载数据
val datas: DataSet[String] = environment.readTextFile("hdfs://localhost:9000/words.txt")//hdfs地址
//导入隐式转换
import org.apache.flink.api.scala._
//指定数据的转化
val flatmap_data: DataSet[String] = datas.flatMap(line => line.split("\\W+"))
val tuple_data: DataSet[(String, Int)] = flatmap_data.map(line => (line , 1))
val groupData: GroupedDataSet[(String, Int)] = tuple_data.groupBy(line => line._1)
val result: DataSet[(String, Int)] = groupData.reduce((x, y) => (x._1 , x._2+y._2))
//触发程序执行
result.print()
}
}

3.2.3 读取CSV文件

package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchFromCsvFile {
  def main(args: Array[String]): Unit = { //初始化环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //导入隐式转换
    import org.apache.flink.api.scala._
    //加载数据
    val datas = env.readCsvFile[(Int,String,String, String,Int, String)](filePath = "/home/tanghc/桌面/hiteamteach/tieba.csv",
      lineDelimiter = "\n", //分隔行的字符串,默认为换行。
      fieldDelimiter=",", //分隔单个字段的字符串,默认值为“,”
      lenient = true, //解析器是否应该忽略格式不正确的行。
      ignoreFirstLine = false,//是否应忽略文件中的第一行。
      includedFields=Array(0,1,2,3,4,5)
    )
    //触发程序执行
    datas.print()
  }
}

3.2.4 目录遍历读取

flink 支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只
会读取第一个文件,其他的都会被忽略。所以需要使用 recursive.file.enumeration 进行
递归读取

package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object BatchFromFolder {
  def main(args: Array[String]): Unit = {
    //初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val parameters = new Configuration
    // recursive.file.enumeration 开启递归
    parameters.setBoolean("recursive.file.enumeration", true)
    val result = env.readTextFile("data").withParameters(parameters)
    //触发程序执行
    result.print()
  }
}

3.2.5 压缩文件读取

package hctang.tech.bacth.Bacth

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchFromCompressFile {
  def main(args:Array[String]):Unit={
    val env=ExecutionEnvironment.getExecutionEnvironment
    //对于以下格式的压缩文件可以直接对去,不过不支持并行读取,只能顺序读取,会影响性能和作业的伸缩性
    //.deflate; .gz;.gzip;.bz2;.xz
    val result=env.readTextFile("")
    result.print()
  }
}
相关标签: Flink 大数据