欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink中的Window

程序员文章站 2024-03-19 22:49:28
...

Flink中的Window

一、Window的分类

1、Time Window

  • 滑动窗口:SlidingEventTimeWindows(.window(window assigner)

    由固定的窗口大小和滑动间隔组成,特点:时间对齐,窗口长度固定,可以由重叠

  • 滚动窗口:TumingEvenTimeWindows (.window(window assigner)

    将数据依据固定长度对数据进行切片*`特点:时间对齐,窗口长度固定,没有重叠,*

  • 会话窗口:

    时间无对齐只要是在设定的时间间隔内没有数据出现则会生成一个会话窗口

2、Count Window

  • 滑动窗口 :countWindow(Int windowsize)

    由窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果

  • 滚动窗口:countWindow(Int windowsize,int Sliding size)

3、Window函数的使用步骤

  • 方式一:
    • keyBy()
    • window()
    • 【trigger()】 //触发器
    • 【evitor()】 // 移除器
    • 【allowedLateness()】 //允许处理迟到的数据
    • 【sideOutputLateDate()】 //迟到的数据放入侧输出流
    • 【getSideOutput()】 //获取侧数据流数据
    • windowFunction() //reduce、sum、max、min、aggregate
  • 方式二:
    • windowAll() //不分区处理,

4、窗口函数的分类

  • 增量聚合函数(incremental aggregation functions)

    • 每条数据到来就进行计算,保持一个简单的状态,仍为流式处理
    • ReduceFunction, AggregateFunction
  • 全窗口函数(full window functions)

    • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,批处理
    • ProcessWindowFunction、apply()

二、Flink中的时间语义

1、Flink中时间语义的分类

  • ​ Event Time : 时间产生时的时间
  • ​ Ingestion Time: 数据进入Flink source中的时间
  • ​ Processiong Time:数据被Flink处理的时间(Flink默认的时间语义)

2、在代码中设置EvenTime(2步)

    //创建Stream环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //获取kafkastream
    val inputStream: DataStream[SensorReading] = env.addSource(new MySensorReadingSource)
    //1-->val mapStream: DataStream[(String, Double)] = inputStream.map(new MyMapper)
    inputStream.assignAscendingTimestamps(_.timeStamp) //设置成自增式的时间戳,适合有序的数据流,无需考虑数据混乱问题
    //2-->调用 assignTimestampAndWatermarks 方法,传入 BoundedOutOfOrdernessTimestampExtractor指定 watermark 
    inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//waterMark一般设置成最大混乱程度(超前数据与后续晚到数据的时间差),当然也要结合实际需求
      override def extractTimestamp(element: SensorReading): Long = element.timeStamp * 10000
    })

3、水位线(waterMark)

  • 解决的问题:乱序问题造成的计算结果问题

    遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口

  • 什么是waterMark:

    • Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
    • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现;
    • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
    • watermark 用来让程序自己平衡延迟和结果正确性

4、waterMark的特点

  1. waterMark是一条特殊的数据记录,为数据内的maxEventTimestamp - 设定的延时时长
  2. waterMark必须是单调递增的,以确保任务的事件时间时钟在向前推进,而不是在后退
  3. watermark 与数据的时间戳相关

5、TimestampAssigner的对比

  • AssignerWithPeriodicWatermarks
    • 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
    • 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
    • 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 每隔5秒产生一个watermark
env.getConfig.setAutoWatermarkInterval(5000)
  • AssignerWithPunctuatedWatermarks
    • 没有时间周期规律,可打断的生成 watermark

6、Flink是如何解决乱序问题的(三重机制)

  1. WaterMark机制:在通过设置WaterMark,可以将调整窗口的关闭时间,尽可能保证WaterMark前的数据全部进入,但仍可能存在到达waterMark后无法进入窗口的问题,则需要依靠第二重机制
  2. 窗口的allowedLateness():设置了窗口的等到迟到的数据后,则有晚到的数据进入窗口时,会触发一次聚合操作,即每进入一次,就会有新的聚合结果输出;但到等待迟到数据时间到时,窗口关闭,后续迟到数据无法进入窗口,需要第三重机制
  3. 开启窗口的sideOutput模式,将迟到无法进入窗口的数据收集,后续取出进行批处理

三、Flink中的过程API(ProcessAPI)

1、ProcessAPI的特点

  • (ctx)可以访问事件的时间戳信息和waterMark信息
  • RichProcess具有一定的周期,可以注册状态,获取和更改状态
  • (TimerService 和 定时器(Timers))可以通过timer来设定超时事件,方式是设定定时器和定时执行的事件
  • (侧输出流(SideOutput))可以实现数据分流,也就是通过侧输出流获取多条支流,且流的数据类型可以不同
  • CoProcessFunction

2、常用的ProcessAPI

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

3、使用案例

object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 读取数据
    val inputStream: DataStream[String] = env.socketTextStream("localhost", 7777)
    val dataStream = inputStream
      .map(line => {
        val arr = line.split(",")
        SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
      })

//    ********************分流的案例**********************
//    process function示例
//    val processedStream = dataStream
//      .keyBy("id")
//      .process(new MyKeyedProcess)
//
//    processedStream.getSideOutput(new OutputTag[Int]("side")).print("side")

    // 需求:检测10秒钟内温度是否连续上升,如果上升,那么报警
    val warningStream = dataStream
      .keyBy("id")
      .process( new TempIncreaseWarning(10000L) )

    warningStream.print()
    env.execute("process function test")
  }
}

// 自定义Keyed Process Function,实现10秒内温度连续上升报警检测
class TempIncreaseWarning(interval: Long) extends KeyedProcessFunction[Tuple, SensorReading, String]{
  // 首先定义状态
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
  lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("current-timer-ts", classOf[Long]))

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#Context, out: Collector[String]): Unit = {
      
    // 取出状态
    val lastTemp = lastTempState.value()
    val curTimerTs = curTimerTsState.value()
    lastTempState.update(value.temperature)

    // 如果温度上升,并且没有定时器,那么注册一个10秒后的定时器
    if( value.temperature > lastTemp && curTimerTs == 0 ){
      val ts = ctx.timerService().currentProcessingTime() + interval
      ctx.timerService().registerProcessingTimeTimer(ts)
      // 更新timerTs状态
      curTimerTsState.update(ts)
    } else if( value.temperature < lastTemp ){
      // 如果温度下降,那么删除定时器
      ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
      // 清空状态
      curTimerTsState.clear()
    }
  }
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    // 定时器触发,说明10秒内没有温度下降,报警
    out.collect(s"传感器 ${ctx.getCurrentKey} 的温度值已经连续 ${interval/1000} 秒上升了")
    // 清空定时器时间戳状态
    curTimerTsState.clear()
  }
}

// 自定义Keyed ProcessFunction
class MyKeyedProcess extends KeyedProcessFunction[Tuple, SensorReading, Int]{

  var myState: ListState[Int] = _

  override def open(parameters: Configuration): Unit = {
    myState = getRuntimeContext.getListState(new ListStateDescriptor[Int]("my-state", classOf[Int]))
  }

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, Int]#Context, out: Collector[Int]): Unit = {
    myState.get()
    myState.add(10)
    myState.update(new util.ArrayList[Int]())

    // 侧输出流
    ctx.output(new OutputTag[Int]("side"), 10)
    // 获取当前键
    ctx.getCurrentKey
    // 获取时间相关
    ctx.timestamp()
    ctx.timerService().currentWatermark()
    ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 10)
    ctx.timerService().deleteProcessingTimeTimer(1000L)
  }
    
  // 定时器触发时的操作定义,若有多个Timer,则需要把Timer也注册成状态集合,然后通过判定属于哪个Timer,从而执行对应的代码
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, Int]#OnTimerContext, out: Collector[Int]): Unit = {
    println("timer occur")
    ctx.output(new OutputTag[Int]("side"), 15)
    out.collect(23)
  }
    
  override def close(): Unit = {}
}
相关标签: flink