Flink中的Window
程序员文章站
2024-03-19 22:49:28
...
Flink中的Window
一、Window的分类
1、Time Window
-
滑动窗口:SlidingEventTimeWindows(.window(window assigner)
由固定的窗口大小和滑动间隔组成,特点:时间对齐,窗口长度固定,可以由重叠
-
滚动窗口:TumingEvenTimeWindows (.window(window assigner)
将数据依据固定长度对数据进行切片*`特点:时间对齐,窗口长度固定,没有重叠,*
-
会话窗口:
时间无对齐,只要是在设定的时间间隔内没有数据出现,则会生成一个会话窗口
2、Count Window
-
滑动窗口 :countWindow(Int windowsize)
由窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果
-
滚动窗口:countWindow(Int windowsize,int Sliding size)
3、Window函数的使用步骤
- 方式一:
- keyBy()
- window()
- 【trigger()】 //触发器
- 【evitor()】 // 移除器
- 【allowedLateness()】 //允许处理迟到的数据
- 【sideOutputLateDate()】 //迟到的数据放入侧输出流
- 【getSideOutput()】 //获取侧数据流数据
- windowFunction() //reduce、sum、max、min、aggregate
- 方式二:
- windowAll() //不分区处理,
4、窗口函数的分类
-
增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态,仍为流式处理
- ReduceFunction, AggregateFunction
-
全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,批处理
- ProcessWindowFunction、apply()
二、Flink中的时间语义
1、Flink中时间语义的分类
- Event Time : 时间产生时的时间
- Ingestion Time: 数据进入Flink source中的时间
- Processiong Time:数据被Flink处理的时间(Flink默认的时间语义)
2、在代码中设置EvenTime(2步)
//创建Stream环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取kafkastream
val inputStream: DataStream[SensorReading] = env.addSource(new MySensorReadingSource)
//1-->val mapStream: DataStream[(String, Double)] = inputStream.map(new MyMapper)
inputStream.assignAscendingTimestamps(_.timeStamp) //设置成自增式的时间戳,适合有序的数据流,无需考虑数据混乱问题
//2-->调用 assignTimestampAndWatermarks 方法,传入 BoundedOutOfOrdernessTimestampExtractor指定 watermark
inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {//waterMark一般设置成最大混乱程度(超前数据与后续晚到数据的时间差),当然也要结合实际需求
override def extractTimestamp(element: SensorReading): Long = element.timeStamp * 10000
})
3、水位线(waterMark)
-
解决的问题:乱序问题造成的计算结果问题
遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
-
什么是waterMark:
- Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
- Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现;
- 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
- watermark 用来让程序自己平衡延迟和结果正确性
4、waterMark的特点
- waterMark是一条特殊的数据记录,为数据内的maxEventTimestamp - 设定的延时时长
- waterMark必须是单调递增的,以确保任务的事件时间时钟在向前推进,而不是在后退
- watermark 与数据的时间戳相关
5、TimestampAssigner的对比
- AssignerWithPeriodicWatermarks
- 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
- 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
- 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔5秒产生一个watermark
env.getConfig.setAutoWatermarkInterval(5000)
- AssignerWithPunctuatedWatermarks
- 没有时间周期规律,可打断的生成 watermark
6、Flink是如何解决乱序问题的(三重机制)
- WaterMark机制:在通过设置WaterMark,可以将调整窗口的关闭时间,尽可能保证WaterMark前的数据全部进入,但仍可能存在到达waterMark后无法进入窗口的问题,则需要依靠第二重机制
- 窗口的allowedLateness():设置了窗口的等到迟到的数据后,则有晚到的数据进入窗口时,会触发一次聚合操作,即每进入一次,就会有新的聚合结果输出;但到等待迟到数据时间到时,窗口关闭,后续迟到数据无法进入窗口,需要第三重机制
- 开启窗口的sideOutput模式,将迟到无法进入窗口的数据收集,后续取出进行批处理
三、Flink中的过程API(ProcessAPI)
1、ProcessAPI的特点
- (ctx)可以访问事件的时间戳信息和waterMark信息
- RichProcess具有一定的周期,可以注册状态,获取和更改状态
- (TimerService 和 定时器(Timers))可以通过timer来设定超时事件,方式是设定定时器和定时执行的事件
- (侧输出流(SideOutput))可以实现数据分流,也就是通过侧输出流获取多条支流,且流的数据类型可以不同
- CoProcessFunction
2、常用的ProcessAPI
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
3、使用案例
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取数据
val inputStream: DataStream[String] = env.socketTextStream("localhost", 7777)
val dataStream = inputStream
.map(line => {
val arr = line.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
// ********************分流的案例**********************
// process function示例
// val processedStream = dataStream
// .keyBy("id")
// .process(new MyKeyedProcess)
//
// processedStream.getSideOutput(new OutputTag[Int]("side")).print("side")
// 需求:检测10秒钟内温度是否连续上升,如果上升,那么报警
val warningStream = dataStream
.keyBy("id")
.process( new TempIncreaseWarning(10000L) )
warningStream.print()
env.execute("process function test")
}
}
// 自定义Keyed Process Function,实现10秒内温度连续上升报警检测
class TempIncreaseWarning(interval: Long) extends KeyedProcessFunction[Tuple, SensorReading, String]{
// 首先定义状态
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))
lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("current-timer-ts", classOf[Long]))
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 取出状态
val lastTemp = lastTempState.value()
val curTimerTs = curTimerTsState.value()
lastTempState.update(value.temperature)
// 如果温度上升,并且没有定时器,那么注册一个10秒后的定时器
if( value.temperature > lastTemp && curTimerTs == 0 ){
val ts = ctx.timerService().currentProcessingTime() + interval
ctx.timerService().registerProcessingTimeTimer(ts)
// 更新timerTs状态
curTimerTsState.update(ts)
} else if( value.temperature < lastTemp ){
// 如果温度下降,那么删除定时器
ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
// 清空状态
curTimerTsState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 定时器触发,说明10秒内没有温度下降,报警
out.collect(s"传感器 ${ctx.getCurrentKey} 的温度值已经连续 ${interval/1000} 秒上升了")
// 清空定时器时间戳状态
curTimerTsState.clear()
}
}
// 自定义Keyed ProcessFunction
class MyKeyedProcess extends KeyedProcessFunction[Tuple, SensorReading, Int]{
var myState: ListState[Int] = _
override def open(parameters: Configuration): Unit = {
myState = getRuntimeContext.getListState(new ListStateDescriptor[Int]("my-state", classOf[Int]))
}
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, Int]#Context, out: Collector[Int]): Unit = {
myState.get()
myState.add(10)
myState.update(new util.ArrayList[Int]())
// 侧输出流
ctx.output(new OutputTag[Int]("side"), 10)
// 获取当前键
ctx.getCurrentKey
// 获取时间相关
ctx.timestamp()
ctx.timerService().currentWatermark()
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 10)
ctx.timerService().deleteProcessingTimeTimer(1000L)
}
// 定时器触发时的操作定义,若有多个Timer,则需要把Timer也注册成状态集合,然后通过判定属于哪个Timer,从而执行对应的代码
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, Int]#OnTimerContext, out: Collector[Int]): Unit = {
println("timer occur")
ctx.output(new OutputTag[Int]("side"), 15)
out.collect(23)
}
override def close(): Unit = {}
}