Flink DataSet API
程序员文章站
2022-07-14 13:51:32
...
一.简介
DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理。Flink先将接入数据(如可以通过读取文本或从本地集合)来创建转换成DataSet数据集,并行分布在集群的每个节点上;然后将DataSet数据集进行各种转换操作(map,filter,union,group等)最后通过DataSink操作将结果数据集输出到外部系统。
流程
- 获得一个执行环境(ExecutionEnvironment)
- 加载/创建初始数据 (Source)
- 指定转换算子操作数据(Transformation)
- 指定存放结果位置(Sink)
二.示例
广播变量
flink 支持广播变量,就是将数据广播到具体taskManager上,数据存储在内存中,这样可以减缓大量的shuffle操作。
def setBroadcast(env: ExecutionEnvironment): Unit ={
import org.apache.flink.api.scala._
val toBroadcast = env.fromElements(1,2,3)
val data = env.fromElements("a","b")
/**
* RichMapFunction 富函数上下文信息
*/
val result = data.map(new RichMapFunction[String,String](){
var mList: mutable.Buffer[String] = _
override def open(config: Configuration): Unit = {
import scala.collection.JavaConverters._
mList = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
}
override def map(in: String): String = {
in +"--->广播数据"+mList.toString()
}
}).withBroadcastSet(toBroadcast,"broadcastSetName")
result.print()
}
读文件
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val filePath = "D:\\workspace\\open_source\\flinkmoduletest\\src\\main\\resources\\file.txt"
val line = env.readTextFile(filePath)
import org.apache.flink.api.scala._
val value = line.flatMap(x=>{
x.split(" ")
})
println("函数")
line.flatMap(new MyFun).collect().foreach(println(_))
}
class MyFun extends FlatMapFunction[String, String]{
override def flatMap(value: String, out: Collector[String]): Unit = {
val s = value.split(" ")
for (e <- s){
out.collect(e)
}
}
}
def fromCollection(env: ExecutionEnvironment) ={
import org.apache.flink.api.scala._
val data = 1 to 10
env.fromCollection(data).print()
}
存储文件
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = 1 to 10
val text = env.fromCollection(data)
val filePath = "fileResult.txt"
text.writeAsText(filePath,WriteMode.OVERWRITE)
env.execute("SinkApp")
常用转换算子
/**
* 笛卡尔积
* @param env
*/
def crossFunction(env:ExecutionEnvironment): Unit ={
val info1 = List("曼联","曼城")
val info2 = List(3,1,0)
import org.apache.flink.api.scala._
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.cross(data2).print()
}
/**
* 全连接
* @param env
*/
def outerFullOuterJoinFunction(env:ExecutionEnvironment): Unit ={
val info1 = ListBuffer[(Int, String)]() // 编号 名字
info1.append((1,"PK哥"))
info1.append((2,"J哥"))
info1.append((3,"小队长"))
info1.append((4,"猪头呼"))
val info2 = ListBuffer[(Int, String)]() // 编号 城市
info2.append((1, "北京"))
info2.append((2, "上海"))
info2.append((3, "成都"))
info2.append((5, "杭州"))
import org.apache.flink.api.scala._
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
if (first == null) {
(second._1, "-", second._2)
} else if (second == null) {
(first._1, first._2, "-")
} else {
(first._1, first._2, second._2)
}
}).print()
}
/**
* 左外连接
* @param env
*/
def outerLeftOuterJoinFunction(env:ExecutionEnvironment): Unit ={
val info1 = ListBuffer[(Int, String)]() // 编号 名字
info1.append((1,"PK哥"))
info1.append((2,"J哥"))
info1.append((3,"小队长"))
info1.append((4,"猪头呼"))
val info2 = ListBuffer[(Int, String)]() // 编号 城市
info2.append((1, "北京"))
info2.append((2, "上海"))
info2.append((3, "成都"))
info2.append((5, "杭州"))
import org.apache.flink.api.scala._
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
if(second == null){
(first._1,first._2,"-")
}else{
(first._1,first._2,second._2)
}
}).print()
}
/**
* 右外连接
* @param env
*/
def outerRightOuterJoin(env:ExecutionEnvironment): Unit ={
val info1 = ListBuffer[(Int, String)]() // 编号 名字
info1.append((1,"PK哥"))
info1.append((2,"J哥"))
info1.append((3,"小队长"))
info1.append((4,"猪头呼"))
val info2 = ListBuffer[(Int, String)]() // 编号 城市
info2.append((1, "北京"))
info2.append((2, "上海"))
info2.append((3, "成都"))
info2.append((5, "杭州"))
import org.apache.flink.api.scala._
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
if(first == null){
(second._1,"-",second._2)
}else{
(first._1,first._2,second._2)
}
}).print()
}
/**
* 内连接
* @param env
*/
def joinFunction(env:ExecutionEnvironment): Unit ={
val info1 = ListBuffer[(Int, String)]() // 编号 名字
info1.append((1, "P哥"));
info1.append((2, "J哥"));
info1.append((3, "小队长"));
info1.append((4, "猪头呼"));
val info2 = ListBuffer[(Int, String)]() // 编号 城市
info2.append((1, "北京"));
info2.append((2, "上海"));
info2.append((3, "成都"));
info2.append((5, "杭州"));
import org.apache.flink.api.scala._
val data1 = env.fromCollection(info1)
val data2 = env.fromCollection(info2)
//where(0).equalTo(0) 分别代表两个数据集要进行join的字段
//(first, second) 分别代表的两个数据集
data1.join(data2).where(0).equalTo(0).apply((first, second) => {
(first._1, first._2, second._2)
}).print()
}
/**
* 去重
*/
def distinctFunction(env:ExecutionEnvironment): Unit ={
val info = ListBuffer[String]()
info.append("hadoop,spark")
info.append("hadoop,flink")
info.append("flink,flink")
import org.apache.flink.api.scala._
val data = env.fromCollection(info)
data.flatMap(_.split(",")).distinct().print()
}
/**
* flatMap 将一个拆分多个
* @param env
*/
def flatMapFunction(env: ExecutionEnvironment): Unit ={
val info = ListBuffer[String]()
info.append("hadoop,spark")
info.append("hadoop,flink")
info.append("flink,flink")
import org.apache.flink.api.scala._
val data = env.fromCollection(info)
data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print()
}
/**
* 转换算子 first获取前n个元素
* @param env
*/
def firstFunction(env: ExecutionEnvironment): Unit ={
val info = ListBuffer[(Int, String)]()
info.append((1, "Hadoop"))
info.append((1, "Spark"))
info.append((1, "Flink"))
info.append((2, "Java"))
info.append((2, "Spring Boot"))
info.append((3, "Linux"))
info.append((4, "VUE"))
import org.apache.flink.api.scala._
val data = env.fromCollection(info)
println("first")
data.first(5).print()
println("first groupby1")
data.groupBy(0).first(2).print()
println("first groupby2")
data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()
}
/**
* DataSource 100个元素,把结果存储到数据库中
* @param env
*/
def mapPartitionFunction(env: ExecutionEnvironment): Unit ={
val students = new ListBuffer[String]
for (i <- 1 to 100) {
students.append("student: " + i)
}
import org.apache.flink.api.scala._
//设置并行度
val data = env.fromCollection(students).setParallelism(5)
//每个分区获取一个connection
data.mapPartition(x => {
val connection = DBUtils.getConection()
println(connection + "....")
DBUtils.returnConnection(connection)
x
}).print()
}
/**
* filter 转换算子
* @param env
*/
def filterFunction(env: ExecutionEnvironment): Unit ={
import org.apache.flink.api.scala._
env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(_ + 1)
.filter(_ > 5)
.print()
}
/**
* map 转换算子
*/
def mapFunction(env: ExecutionEnvironment): Unit = {
import org.apache.flink.api.scala._
val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// 对data中的每个元素都做一个+1的操作
// data.map((x:Int) => x + 1).print()
// data.map((x) => x + 1).print()
// data.map(x => x + 1).print()
data.map(_ + 1).print()
}
分布式缓存
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件。
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val filePath = "fileCache.txt"
//step1: 注册一个本地/HDFS文件
env.registerCachedFile(filePath,"pk-scala-dc")
import org.apache.flink.api.scala._
val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
data.map(new RichMapFunction[String,String] {
// step2:在open方法中获取到分布式缓存的内容即可
override def open(parameters: Configuration): Unit = {
val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
val lines = FileUtils.readLines(dcFile)
/**
* 此时会出现一个异常:java集合和scala集合不兼容的问题
*/
import scala.collection.JavaConverters._
for (ele <- lines.asScala) {
println(ele)
}
}
override def map(value: String): String = value
}).print()
}
上一篇: 提升树算法总结(一)
下一篇: 链式队列的实现