flink Window机制
什么是 Window
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的5分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百条数据)。Flink窗口分为:
- 滚动窗口(Tumbling Window,固定时间窗大小,无重叠)
- 跳跃窗口(Sliding Window,固定时间窗大小,有重叠)
- 会话窗口(Session Window,非活动时间间隔)
- 全局窗口(Global Window,相同键的所有元素分配给同一个全局窗口)
Time Window
首先,Flink 提出了三种时间的概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间,默认)。Flink 中窗口机制和时间类型是完全解耦的,也就是说当需要改变时间类型时不需要更改窗口逻辑相关的代码。
Tumbling Windows
滚动时间窗口会将记录分配到连续、不重叠且具有固定时间长度的窗口中。例如以5分钟为间隔统计一次访问我们网站的pv数,滚动时间窗口会将记录按照每5分钟进行分组。
java Tumbling Window使用如下:
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Sliding Windows
每个跳跃时间窗口都有一个固定大小的时间长度(通过第一个 interval 参数定义)和一个跳跃间隔(通过第二个 interval 参数定义)。例如每隔5分钟统计过去10分钟的网站pv数。
Java Sliding Window使用:
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Session Windows
会话时间窗口没有固定长度,其边界是通过一个“非活动时间间隔”来指定,即如果超过一段时间没有满足现有窗口条件的数据到来,则判定窗口结束。例如给定一个时间间隔为30分钟的会话时间窗口,如果某条记录到来之前已经有超过30分钟没有记录,则会开启一个新的窗口实例(否则该记录会被加到已有窗口实例中)。同样,如果再出现连续30分钟的记录真空期,则当前窗口实例会被关闭。
Java Session Time使用:
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
Global Windows
全局窗口分配器将具有相同键的所有元素分配给同一个全局窗口。此窗口方案仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束标志。
Java Global Windows使用:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
推荐阅读
-
Window程序内部机制(上)
-
Android Window 机制探索
-
flink Window机制
-
window系统下Node.js安装以及环境变量配置
-
window10下mysql8.0安装配置
-
搭建一个简单的redis-sentinel(哨兵机制)集群
-
工作环境移植到window7下的记录 博客分类: 业界 工作OracleXPDelphiDreamweaver
-
工作环境移植到window7下的记录 博客分类: 业界 工作OracleXPDelphiDreamweaver
-
redis RDB和AOF持久化机制
-
window emacs esense 安装 博客分类: emacs erlangemacs