Flink中Periodic水印和Punctuated水印实现原理(源码分析)
在用户代码中,我们设置生成水印和事件时间的方法assigntimestampsandwatermarks()中这里有个方法的重载
我们传入的对象分为两种
assignerwithpunctuatedwatermarks(可以理解为每条数据都会产生水印,如果不想产生水印,返回一个null的水印)
assignerwithperiodicwatermarks(周期性的生成水印)
来看一下源码中是如何实现这两种水印的
二话不说打开org.apache.flink.streaming.runtime.operators.timestampsandpunctuatedwatermarksoperator.java
这个类的processelement方法
看到源码这里这段逻辑就 非常的清晰了
先通过用户的代码获取到事件时间,注入到element里面就直接往下个opeartor发送了
然后通过用户代码获取水印,这里会判断水印是否为null
不为null的就直接往下游emit 了
现在看一下assignerwithperiodicwatermarks如何周期的发送生成的水印
直接打开timestampsandperiodicwatermarksoperator.java这个类
这里先不看processelement()方法,先看open方法
可以看到它将 当前时间其实就是system.currenttimemillis()+ watermarkinterval水印间隔 注册作为了一个timer定时器
这样就知道了,当他过了这个水印间隔时间以后肯定会触发操作
来看一下这个间隔时间以后触发了什么操作
可以看到,他先是获取了当前的水印时间,然后直接emit出去了????
periodic模式明明是在接收数据的processelement()发送水印的
然后又再次注册了一个 当前时间+间隔的 timer,这样就无限的触发下去了
既然他在这里发送了水印,来看下他的processelement方法
果然他周期性的发送水印以后,接收数据的processelement()方法里面就没有发送水印了
只有获取事件时间的逻辑了