欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apache Flink(五):Flink Operator

程序员文章站 2022-07-14 13:18:03
...

Operator(操作符)

DataStream Transformations 数据流转换

Datastream -> Datasteam

Map(映射)

取一个元素并产生一个元素,是一个映射函数。下面的例子为使用Map将输入流的值加倍

dataStream.map { x => x * 2 }

FlatMap(展开)

取一个元素并产生零个,一个或多个元素。FlatMap可将英文短语拆分为单词

dataStream.flatMap { str => str.split(" ") }

Filter(过滤)

为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。 过滤出零值的过滤器,即布尔函数的返回值为true则保留

dataStream.filter { _ != 0 }

Union(联合)

两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。 注意:如果将数据流与其自身合并,则在结果流中每个元素将获得两次。

dataStream.union(otherStream1, otherStream2, ...)

DataStream,DataStream → ConnectedStreams

Connect(连接)

Connect两个保留其类型的数据流,从而允许两个流之间共享状态。

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)

CoMap, CoFlatMap

类似于ConnectedStreams上的map和flatMap

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)

案例代码

import org.apache.flink.streaming.api.scala._

object FlinkConnectStream {

  def main(args: Array[String]): Unit = {

    // 1.创建StreamExecutionEnvironment
    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.创建DataStream
    val ds1 : DataStream[String] = flinkEnv.fromCollection(List("Hello Flink", "Hello Spark", "Hello Scala"))
    val ds2 : DataStream[Int] = flinkEnv.fromCollection(List(2,3,5,7))

    val ds : ConnectedStreams[String, Int] = ds1.connect(ds2)

    ds.flatMap(
      (line:String) => line.split("\\s+"),
      (line:Int) => line.toString
    ).print()

    // 执行计算
    flinkEnv.execute("FlinkConnectStream")

  }

}

DataStream → SplitStream

Split(分割)

根据某个标准将流分成两个或多个流

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

Select(选择)

从拆分流中选择一个或多个流

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

案例代码(Split方式已过时)

    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    
    val logStream = fsEnv.socketTextStream("Spark",9999)
    
    val splitStream: SplitStream[String] = logStream.split(new OutputSelector[String] {
        override def select(out: String): lang.Iterable[String] = {
            if (out.startsWith("INFO")) {
                val array = new util.ArrayList[String]()
                array.add("info")
                return array
            } else  {
                val array = new util.ArrayList[String]()
                array.add("error")
                return array
            }
        }
    })
    
    splitStream.select("info").print("info")
    splitStream.select("error").printToErr("error")
    
    fsEnv.execute("ConnectedStream")

用法二(优先)

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object FlinkSplitStream {

  def main(args: Array[String]): Unit = {

    val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment

    val logStream = flinkEnv.socketTextStream("Spark", 6666)

    val opt = new OutputTag[String]("error")

    val dataStream = logStream.process(new ProcessFunction[String, String] {

      override def processElement(value: String, ctx: ProcessFunction[String, String]#Context,
                                  out: Collector[String]): Unit = {

        // ERROR开头,为错误信息
        if (value.startsWith("ERROR")) {
          ctx.output(opt, value)
        } else {
          out.collect(value)
        }

      }

    })

    // 打标记
    dataStream.print("正常信息")
    dataStream.getSideOutput(opt).printToErr("错误信息")

    flinkEnv.execute("FlinkSplitStream")

  }

}

DataStream → KeyedDataStream

KeyBy(按键分组)

在逻辑上将流划分为不相交的分区,每个分区都包含同一键的元素。 在内部,这是通过哈希算法分区实现的。

dataStream.keyBy("someKey") // 通过指定的someKey进行分区
dataStream.keyBy(0) // 通过元组的第一个元素进行分区

Reduce(无初始值的滚动聚合)

对 KeyedDataStream进行滚动聚合。 将当前元素与最后一个Reduce的值合并出新值

dataStream
  .flatMap(_.split("\\s+"))
  .map((_, 1))
  .keyBy(0)
  .reduce((t1, t2) => (t1._1, t1._2 + t2._2))
  .print()

Fold(有初始值的滚动聚合)

带有初始值的 KeyedDataStream的滚动折叠。 将当前元素与上一个Fold的值合并出新值

val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations(滚动聚合)

在KeyedDataStream上滚动聚合。 min和minBy之间的区别是min返回最小值,而minBy返回在此字段中具有最小值的元素(与max和maxBy相同)。

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

Physical partitioning 物理分区

Flink提供了一些分区方案,可供用户选择,分区目的是为了任务之间数据的能够均衡分布。

分区方案 说明 实现方式
Custom partitioning 需要用户实现分区策略 dataStream.partitionCustom(partitioner, “someKey”)
Random partitioning 将当前的数据随机分配给下游任务 dataStream.shuffle()
Rebalancing (Round-robin partitioning) 轮询将上游的数据均分下游任务 dataStream.rebalance()
Rescaling 缩放分区数据,例如上游2个并行度/下游4个 ,上游会将1个分区的数据发送给下游前两个分区,后1个分区,会发送下游后两个 dataStream.rescale()
Broadcasting 上游会将分区所有数据,广播给下游的所有任务分区 dataStream.broadcast()

Task chaining and resource groups 任务链和资源组

连接两个Operator 转换,尝试将两个Operator 转换放置到一个线程当中,可以减少线程消耗,避免不必要的线程通信。用户可以通过 StreamExecutionEnvironment.disableOperatorChaining()来禁用chain操作。

为了方便,Flink提供如下的算子用于修改chain的行为

算子 操作 说明
Start new chain someStream.filter(…).map(…).startNewChain().map(…) 开启新chain,将当前算子和filter断开
Disable chaining someStream.map(…).disableChaining() 当前算子和前后都要断开chain操作
Set slot sharing group someStream.filter(…).slotSharingGroup(“name”) 设置操作任务所属资源Group,影响任务对TaskSlots占用
    val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream -细化
    val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
    
    //3.对数据做转换
    dataStream.filter(line => line.startsWith("INFO"))
    .flatMap(_.split("\\s+"))
    .startNewChain()
    .slotSharingGroup("g1")
    .map((_,1))
    .map(t=>WordPair(t._1,t._2))
    .print()
    
    fsEnv.execute("FlinkWordCountsQuickStart")