欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataSet API

程序员文章站 2022-07-14 13:51:32
...

一.简介

DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理。Flink先将接入数据(如可以通过读取文本或从本地集合)来创建转换成DataSet数据集,并行分布在集群的每个节点上;然后将DataSet数据集进行各种转换操作(map,filter,union,group等)最后通过DataSink操作将结果数据集输出到外部系统。

流程

  • 获得一个执行环境(ExecutionEnvironment)
  • 加载/创建初始数据 (Source)
  • 指定转换算子操作数据(Transformation)
  • 指定存放结果位置(Sink)

二.示例

广播变量

flink 支持广播变量,就是将数据广播到具体taskManager上,数据存储在内存中,这样可以减缓大量的shuffle操作。

def setBroadcast(env: ExecutionEnvironment): Unit ={
  import org.apache.flink.api.scala._
  val toBroadcast  = env.fromElements(1,2,3)
  val data = env.fromElements("a","b")
  /**
   * RichMapFunction 富函数上下文信息
   */
  val result = data.map(new RichMapFunction[String,String](){
    var mList: mutable.Buffer[String] = _
    override def open(config: Configuration): Unit = {
      import scala.collection.JavaConverters._
      mList  = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }
    override def map(in: String): String = {
      in +"--->广播数据"+mList.toString()
    }
  }).withBroadcastSet(toBroadcast,"broadcastSetName")
  result.print()
}

读文件

def main(args: Array[String]): Unit = {
  val env = ExecutionEnvironment.getExecutionEnvironment

  val filePath = "D:\\workspace\\open_source\\flinkmoduletest\\src\\main\\resources\\file.txt"
  val line =  env.readTextFile(filePath)
  import org.apache.flink.api.scala._
  val value = line.flatMap(x=>{
    x.split(" ")
  })

  println("函数")
  line.flatMap(new MyFun).collect().foreach(println(_))
}
class MyFun extends FlatMapFunction[String, String]{
  override def flatMap(value: String, out: Collector[String]): Unit = {
    val s = value.split(" ")
    for (e <- s){
      out.collect(e)
    }
  }
}
def fromCollection(env: ExecutionEnvironment) ={
  import org.apache.flink.api.scala._
  val data = 1 to 10
  env.fromCollection(data).print()
}

存储文件

val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = 1 to 10
val text = env.fromCollection(data)
val filePath = "fileResult.txt"
text.writeAsText(filePath,WriteMode.OVERWRITE)
env.execute("SinkApp")

常用转换算子

/**
 * 笛卡尔积
 * @param env
 */
def crossFunction(env:ExecutionEnvironment): Unit ={
  val info1 = List("曼联","曼城")
  val info2 = List(3,1,0)
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.cross(data2).print()
}
/**
 *  全连接
 * @param env
 */
def outerFullOuterJoinFunction(env:ExecutionEnvironment): Unit ={
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)

  data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
    if (first == null) {
      (second._1, "-", second._2)
    } else if (second == null) {
      (first._1, first._2, "-")
    } else {
      (first._1, first._2, second._2)
    }
  }).print()
}
/**
 * 左外连接
 * @param env
 */
def outerLeftOuterJoinFunction(env:ExecutionEnvironment): Unit ={
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
    if(second == null){
      (first._1,first._2,"-")
    }else{
      (first._1,first._2,second._2)
    }
  }).print()
}
/**
 * 右外连接
 * @param env
 */
def outerRightOuterJoin(env:ExecutionEnvironment): Unit ={
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
    if(first == null){
      (second._1,"-",second._2)
    }else{
      (first._1,first._2,second._2)
    }
  }).print()
}
/**
 * 内连接
 * @param env
 */
def joinFunction(env:ExecutionEnvironment): Unit ={
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1, "P哥"));
  info1.append((2, "J哥"));
  info1.append((3, "小队长"));
  info1.append((4, "猪头呼"));
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"));
  info2.append((2, "上海"));
  info2.append((3, "成都"));
  info2.append((5, "杭州"));
  import org.apache.flink.api.scala._

  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  //where(0).equalTo(0) 分别代表两个数据集要进行join的字段
  //(first, second) 分别代表的两个数据集
  data1.join(data2).where(0).equalTo(0).apply((first, second) => {
    (first._1, first._2, second._2)
  }).print()
}
/**
 * 去重
 */
def distinctFunction(env:ExecutionEnvironment): Unit ={
  val  info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)

  data.flatMap(_.split(",")).distinct().print()
}
/**
 * flatMap 将一个拆分多个
 * @param env
 */
def flatMapFunction(env: ExecutionEnvironment): Unit ={
  val info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)
  data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print()
}
/**
 * 转换算子  first获取前n个元素
 * @param env
 */
def firstFunction(env: ExecutionEnvironment): Unit ={
  val info = ListBuffer[(Int, String)]()
  info.append((1, "Hadoop"))
  info.append((1, "Spark"))
  info.append((1, "Flink"))
  info.append((2, "Java"))
  info.append((2, "Spring Boot"))
  info.append((3, "Linux"))
  info.append((4, "VUE"))
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)
  println("first")
  data.first(5).print()
  println("first groupby1")
  data.groupBy(0).first(2).print()
  println("first groupby2")
  data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()

}
/**
 * DataSource 100个元素,把结果存储到数据库中
 * @param env
 */
def mapPartitionFunction(env: ExecutionEnvironment): Unit ={
  val students = new ListBuffer[String]
  for (i <- 1 to 100) {
    students.append("student: " + i)
  }
  import org.apache.flink.api.scala._
  //设置并行度
  val data = env.fromCollection(students).setParallelism(5)
  //每个分区获取一个connection
  data.mapPartition(x => {
    val connection = DBUtils.getConection()
    println(connection + "....")
    DBUtils.returnConnection(connection)
    x
  }).print()
}
/**
 * filter 转换算子
 * @param env
 */
def filterFunction(env: ExecutionEnvironment): Unit ={
  import org.apache.flink.api.scala._
  env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    .map(_ + 1)
    .filter(_ > 5)
    .print()
}
/**
 * map 转换算子
 */
def mapFunction(env: ExecutionEnvironment): Unit = {
  import org.apache.flink.api.scala._
  val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  // 对data中的每个元素都做一个+1的操作
  // data.map((x:Int) => x + 1).print()
  // data.map((x) => x + 1).print()
  // data.map(x => x + 1).print()
  data.map(_ + 1).print()
}

分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。

当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件。

def main(args: Array[String]): Unit = {
  val env = ExecutionEnvironment.getExecutionEnvironment
  val filePath = "fileCache.txt"
  //step1: 注册一个本地/HDFS文件
  env.registerCachedFile(filePath,"pk-scala-dc")
  import org.apache.flink.api.scala._
  val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
  data.map(new RichMapFunction[String,String] {
    // step2:在open方法中获取到分布式缓存的内容即可
    override def open(parameters: Configuration): Unit = {
      val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
      val lines = FileUtils.readLines(dcFile)
      /**
       * 此时会出现一个异常:java集合和scala集合不兼容的问题
       */
      import scala.collection.JavaConverters._
      for (ele <- lines.asScala) {
        println(ele)
      }
    }
    override def map(value: String): String = value
  }).print()
}