Flink CEP 入门第一步
CEP 介绍
CEP(Complex Event Process) 直译过来就是复杂事件处理,具体到底有多复杂呢?基本上在时序数据场景中要监控某一些有特征的信息,可以理解为复杂事件。
Flink 1.0 以后就开始支持复杂事件处理,可以在项目中通过引用 flink-cep.jar 包来做复杂事件处理。
场景描述
-
监控物联网中某设备上送到时序数据,对发现某设备的温度在10秒钟内有两次超过100度时,产生一个报警记录;
-
对产生的报警信息进行二次监控,如果发现报警信息中两条信息的平均值在20秒内有递增的情况,要发出一条通知消息。
需求分析
-
这个需求场景针对的是某一设备的数据进行的报警分析,肯定要针对设备有一个唯一的设备标识
-
需求1中要监控设备上送的原始数据,对发现超过100度的数据时刻为开始,在接下来的10秒内如果又有一个超过100度的数据,则产生一个报警记录
-
需求2中要监控产生的报警数据的平均值,以20秒为间隔,如果平均值有递增的情况,发出一条通知消息
主要逻辑
-
定义 报警记录的匹配模式 Pattern.beging(“first”).where(value > 100).next(“second”).where(value > 100).within(10s)
-
对时序数据进行报警模式匹配,并按照设备标识进行分组,形成patternStream
-
对匹配到的报警数据做平均值计算,patternStream.select((map<String, List> element) ->{return (“first”.value+"second".value)/2})
-
定义 发送通知消息的匹配模式 Pattern.beging(“first”).next(“second”).within(20s)
-
对报警记录实行消息4定义的模式匹配,并按照设备标识进行分组,形成patternStream
-
对匹配到的数据进行逻辑分析,如果有平均值递增的情况,就发送通知信息
源码分享
// Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
// appearing within a time interval of 10 seconds
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = -6301755149429716724L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.next("second")
.subtype(TemperatureEvent.class)
.where(new IterativeCondition<TemperatureEvent>() {
private static final long serialVersionUID = 2392863109523984059L;
@Override
public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
return value.getTemperature() >= TEMPERATURE_THRESHOLD;
}
})
.within(Time.seconds(10));
// Create a pattern stream from our warning pattern
PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
inputEventStream.keyBy("rackID"),
warningPattern);
// Generate temperature warnings for each matched warning pattern
DataStream<TemperatureWarning> warnings = tempPatternStream.select(
(Map<String, List<MonitoringEvent>> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);
return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
}
);
// Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
.next("second")
.within(Time.seconds(20));
// Create a pattern stream from our alert pattern
PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
warnings.keyBy("rackID"),
alertPattern);
// Generate a temperature alert only if the second temperature warning's average temperature is higher than
// first warning's temperature
DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
(Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
TemperatureWarning first = pattern.get("first").get(0);
TemperatureWarning second = pattern.get("second").get(0);
if (first.getAverageTemperature() < second.getAverageTemperature()) {
out.collect(new TemperatureAlert(first.getRackID()));
}
},
TypeInformation.of(TemperatureAlert.class));
// Print the warning and alert events to stdout
warnings.print();
alerts.print();
测试结果
2020-11-25 17:47:35:87 emit data: TemperatureEvent(5, 62.16286476572637)
2020-11-25 17:47:36:17 emit data: TemperatureEvent(5, 76.30725109897276)
2020-11-25 17:47:36:47 emit data: TemperatureEvent(5, 107.76770951345256)
2020-11-25 17:47:36:77 emit data: PowerEvent(3, 106.68654516819112)
2020-11-25 17:47:37:07 emit data: TemperatureEvent(3, 80.30137883422269)
2020-11-25 17:47:37:37 emit data: TemperatureEvent(4, 47.77738055334537)
2020-11-25 17:47:37:67 emit data: PowerEvent(7, 74.51323501555572)
2020-11-25 17:47:37:97 emit data: PowerEvent(9, 114.84340930132853)
2020-11-25 17:47:38:27 emit data: PowerEvent(3, 99.01726074676452)
2020-11-25 17:47:38:57 emit data: TemperatureEvent(7, 65.44951371049709)
2020-11-25 17:47:38:87 emit data: TemperatureEvent(8, 97.25405709950533)
2020-11-25 17:47:39:17 emit data: PowerEvent(9, 103.84754449954623)
2020-11-25 17:47:39:47 emit data: PowerEvent(4, 94.60893342305071)
2020-11-25 17:47:39:77 emit data: TemperatureEvent(2, 78.77024073146522)
2020-11-25 17:47:40:07 emit data: TemperatureEvent(2, 73.52434875689119)
2020-11-25 17:47:40:37 emit data: PowerEvent(4, 86.76448452660071)
2020-11-25 17:47:40:67 emit data: TemperatureEvent(8, 101.17490865381467)
2020-11-25 17:47:40:97 emit data: TemperatureEvent(5, 101.49599620863171)
TemperatureWarning(5, 104.63185286104213)
2020-11-25 17:47:41:27 emit data: TemperatureEvent(5, 115.554622385676)
TemperatureWarning(5, 108.52530929715385)
TemperatureAlert(5)
2020-11-25 17:47:41:57 emit data: TemperatureEvent(3, 72.9746590530073)
2020-11-25 17:47:41:88 emit data: PowerEvent(4, 102.61427127744209)
参考地址
源码可参考https://github.com/tillrohrmann/cep-monitoring.git,我这里对其进行了简单的修改。
本文地址:https://blog.csdn.net/wowSpark/article/details/110142419