Flink 批处理之DataSet
1.批处理流程
- 获取 Flink 批处理执行环境
- 构建 source
- 数据处理
- 构建 sink
3.Data Sources
Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。 flink 在批处理中常见的 source 主要有两大类。
- 基于本地集合的 source(Collection-based-source)
- 基于文件的 source(File-based-source)
3.1基于本地集合的 source(Collection-based-source)
在 flink 最常见的创建 DataSet 方式有三种。
-
使用 env.fromElements(),这种方式也支持 Tuple,自定义对象等复合形式。
-
使用 env.fromCollection(),这种方式支持多种 Collection 的具体类型
-
使用 env.generateSequence()方法创建基于 Sequence 的 DataSet
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object BatchFromCollection {
def main(args: Array[String]): Unit = { //获取 flink 执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//0.用 element 创建 DataSet(fromElements)
val ds0: DataSet[String] = env.fromElements("spark", "flink")
ds0.print()
//1.用 Tuple 创建 DataSet(fromElements)
val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
ds1.print()
//2.用 Array 创建 DataSet
val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
ds2.print()
//3.用 ArrayBuffer 创建 DataSet
val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
ds3.print()
//4.用 List 创建 DataSet
val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
ds4.print()
//5.用 List 创建 DataSet
val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
ds5.print()
//6.用 Vector 创建 DataSet
val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
ds6.print()
//7.用 Queue 创建 DataSet
val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
ds7.print()
//8.用 Stack 创建 DataSet
val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
ds8.print()
//9.用 Stream 创建 DataSet(Stream 相当于 lazy List,避免在中间过程中生成不必要的集合)
val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
ds9.print()
//10.用 Seq 创建 DataSet
val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
ds10.print()
//11.用 Set 创建 DataSet
val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
ds11.print()
//12.用 Iterable 创建 DataSet
val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
ds12.print()
//13.用 ArraySeq 创建 DataSet
val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
ds13.print()
//14.用 ArrayStack 创建 DataSet
val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
ds14.print()
//15.用 Map 创建 DataSet
val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
ds15.print()
//16.用 Range 创建 DataSet
val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
ds16.print()
//17.用 fromElements 创建 DataSet
val ds17: DataSet[Long] = env.generateSequence(1, 9)
ds17.print()
}
}
3.2基于文件的 source(File-based-source)
3.2.1 读取本地文件
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchFromFile {
def main(args:Array[String]):Unit={
//使用readFile 读取本地文件
val environment:ExecutionEnvironment=ExecutionEnvironment.getExecutionEnvironment
val data:DataSet[String]=environment.readTextFile("data/data.txt")
//导入隐式转换
import org.apache.flink.api.scala._
//指定数据的转换
val flatmap_data:DataSet[String]=data.flatMap(Line=>Line.split("\\W+"))
val tuple_data:DataSet[(String,Int)]= flatmap_data.map(line=>(line,1))
tuple_data.print()
val groupData:GroupedDataSet[(String,Int)]=tuple_data.groupBy(line => line._1)
val result:DataSet[(String,Int)]=groupData.reduce((x,y)=>(x._1,x._2+y._2))//统计相同键下的数量
//触发程序执行
result.print()
}
}
3.2.2 读取HDFS文件
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchfromHDFSFile {
def main(args: Array[String]): Unit = { //使用 readTextFile 读取本地文件
//初始化环境
val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//加载数据
val datas: DataSet[String] = environment.readTextFile("hdfs://localhost:9000/words.txt")//hdfs地址
//导入隐式转换
import org.apache.flink.api.scala._
//指定数据的转化
val flatmap_data: DataSet[String] = datas.flatMap(line => line.split("\\W+"))
val tuple_data: DataSet[(String, Int)] = flatmap_data.map(line => (line , 1))
val groupData: GroupedDataSet[(String, Int)] = tuple_data.groupBy(line => line._1)
val result: DataSet[(String, Int)] = groupData.reduce((x, y) => (x._1 , x._2+y._2))
//触发程序执行
result.print()
}
}
3.2.3 读取CSV文件
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchFromCsvFile {
def main(args: Array[String]): Unit = { //初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//导入隐式转换
import org.apache.flink.api.scala._
//加载数据
val datas = env.readCsvFile[(Int,String,String, String,Int, String)](filePath = "/home/tanghc/桌面/hiteamteach/tieba.csv",
lineDelimiter = "\n", //分隔行的字符串,默认为换行。
fieldDelimiter=",", //分隔单个字段的字符串,默认值为“,”
lenient = true, //解析器是否应该忽略格式不正确的行。
ignoreFirstLine = false,//是否应忽略文件中的第一行。
includedFields=Array(0,1,2,3,4,5)
)
//触发程序执行
datas.print()
}
}
3.2.4 目录遍历读取
flink 支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只
会读取第一个文件,其他的都会被忽略。所以需要使用 recursive.file.enumeration 进行
递归读取
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object BatchFromFolder {
def main(args: Array[String]): Unit = {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
val parameters = new Configuration
// recursive.file.enumeration 开启递归
parameters.setBoolean("recursive.file.enumeration", true)
val result = env.readTextFile("data").withParameters(parameters)
//触发程序执行
result.print()
}
}
3.2.5 压缩文件读取
package hctang.tech.bacth.Bacth
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchFromCompressFile {
def main(args:Array[String]):Unit={
val env=ExecutionEnvironment.getExecutionEnvironment
//对于以下格式的压缩文件可以直接对去,不过不支持并行读取,只能顺序读取,会影响性能和作业的伸缩性
//.deflate; .gz;.gzip;.bz2;.xz
val result=env.readTextFile("")
result.print()
}
}
上一篇: 链式队列的实现
下一篇: 数据结构-循环队列的实现
推荐阅读
-
计算机之批处理处理程序
-
DOS批处理之DATE命令的使用方法详解
-
Flink入门(五)——DataSet Api编程指南
-
Flink DataStream API之Operators
-
Flink实战之合并小文件
-
flink学习之六-数据持久化to-kafka
-
(第一篇)pytorch数据预处理三剑客之——Dataset,DataLoader,Transform
-
Flink(18):Flink之累加器
-
1.11.Flink DataSetAPI、DataSet API之Data Sources、DataSet API之Transformations、DataSet Sink部分详解
-
Flink 自定义Sink 之 写入HDFS