欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Flink CEP 入门第一步

程序员文章站 2022-03-15 14:24:07
Flink CEP 入门第一步CEP 介绍场景描述需求分析主要逻辑源码分享测试结果参考地址CEP 介绍CEP(Complex Event Process) 直译过来就是复杂事件处理,具体到底有多复杂呢?基本上在时序数据场景中要监控某一些有特征的信息,可以理解为复杂事件。Flink 1.0 以后就开始支持复杂事件处理,可以在项目中通过引用 flink-cep.jar 包来做复杂事件处理。场景描述监控物联网中某设备上送到时序数据,对发现某设备的温度在10秒钟内有两次超过100度时,产生一个报警记录...

CEP 介绍

CEP(Complex Event Process) 直译过来就是复杂事件处理,具体到底有多复杂呢?基本上在时序数据场景中要监控某一些有特征的信息,可以理解为复杂事件。

Flink 1.0 以后就开始支持复杂事件处理,可以在项目中通过引用 flink-cep.jar 包来做复杂事件处理。

场景描述

  1. 监控物联网中某设备上送到时序数据,对发现某设备的温度在10秒钟内有两次超过100度时,产生一个报警记录;

  2. 对产生的报警信息进行二次监控,如果发现报警信息中两条信息的平均值在20秒内有递增的情况,要发出一条通知消息。

需求分析

  1. 这个需求场景针对的是某一设备的数据进行的报警分析,肯定要针对设备有一个唯一的设备标识

  2. 需求1中要监控设备上送的原始数据,对发现超过100度的数据时刻为开始,在接下来的10秒内如果又有一个超过100度的数据,则产生一个报警记录

  3. 需求2中要监控产生的报警数据的平均值,以20秒为间隔,如果平均值有递增的情况,发出一条通知消息

主要逻辑

  1. 定义 报警记录的匹配模式 Pattern.beging(“first”).where(value > 100).next(“second”).where(value > 100).within(10s)

  2. 对时序数据进行报警模式匹配,并按照设备标识进行分组,形成patternStream

  3. 对匹配到的报警数据做平均值计算,patternStream.select((map<String, List> element) ->{return (“first”.value+"second".value)/2})

  4. 定义 发送通知消息的匹配模式 Pattern.beging(“first”).next(“second”).within(20s)

  5. 对报警记录实行消息4定义的模式匹配,并按照设备标识进行分组,形成patternStream

  6. 对匹配到的数据进行逻辑分析,如果有平均值递增的情况,就发送通知信息

源码分享

    // Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
    // appearing within a time interval of 10 seconds
    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
            .subtype(TemperatureEvent.class)
            .where(new IterativeCondition<TemperatureEvent>() {
                private static final long serialVersionUID = -6301755149429716724L;

                @Override
                public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                }
            })
            .next("second")
            .subtype(TemperatureEvent.class)
            .where(new IterativeCondition<TemperatureEvent>() {
                private static final long serialVersionUID = 2392863109523984059L;

                @Override
                public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                    return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                }
            })
            .within(Time.seconds(10));

    // Create a pattern stream from our warning pattern
    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
            inputEventStream.keyBy("rackID"),
            warningPattern);

    // Generate temperature warnings for each matched warning pattern
    DataStream<TemperatureWarning> warnings = tempPatternStream.select(
        (Map<String, List<MonitoringEvent>> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
            TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);

            return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

    // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
    Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
            .next("second")
            .within(Time.seconds(20));

    // Create a pattern stream from our alert pattern
    PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
            warnings.keyBy("rackID"),
            alertPattern);

    // Generate a temperature alert only if the second temperature warning's average temperature is higher than
    // first warning's temperature
    DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
        (Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureWarning first = pattern.get("first").get(0);
            TemperatureWarning second = pattern.get("second").get(0);

            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        },
        TypeInformation.of(TemperatureAlert.class));

    // Print the warning and alert events to stdout
    warnings.print();
    alerts.print();

测试结果

2020-11-25 17:47:35:87  emit data: TemperatureEvent(5, 62.16286476572637)
2020-11-25 17:47:36:17  emit data: TemperatureEvent(5, 76.30725109897276)
2020-11-25 17:47:36:47  emit data: TemperatureEvent(5, 107.76770951345256)
2020-11-25 17:47:36:77  emit data: PowerEvent(3, 106.68654516819112)
2020-11-25 17:47:37:07  emit data: TemperatureEvent(3, 80.30137883422269)
2020-11-25 17:47:37:37  emit data: TemperatureEvent(4, 47.77738055334537)
2020-11-25 17:47:37:67  emit data: PowerEvent(7, 74.51323501555572)
2020-11-25 17:47:37:97  emit data: PowerEvent(9, 114.84340930132853)
2020-11-25 17:47:38:27  emit data: PowerEvent(3, 99.01726074676452)
2020-11-25 17:47:38:57  emit data: TemperatureEvent(7, 65.44951371049709)
2020-11-25 17:47:38:87  emit data: TemperatureEvent(8, 97.25405709950533)
2020-11-25 17:47:39:17  emit data: PowerEvent(9, 103.84754449954623)
2020-11-25 17:47:39:47  emit data: PowerEvent(4, 94.60893342305071)
2020-11-25 17:47:39:77  emit data: TemperatureEvent(2, 78.77024073146522)
2020-11-25 17:47:40:07  emit data: TemperatureEvent(2, 73.52434875689119)
2020-11-25 17:47:40:37  emit data: PowerEvent(4, 86.76448452660071)
2020-11-25 17:47:40:67  emit data: TemperatureEvent(8, 101.17490865381467)
2020-11-25 17:47:40:97  emit data: TemperatureEvent(5, 101.49599620863171)
TemperatureWarning(5, 104.63185286104213)
2020-11-25 17:47:41:27  emit data: TemperatureEvent(5, 115.554622385676)
TemperatureWarning(5, 108.52530929715385)
TemperatureAlert(5)
2020-11-25 17:47:41:57  emit data: TemperatureEvent(3, 72.9746590530073)
2020-11-25 17:47:41:88  emit data: PowerEvent(4, 102.61427127744209)

参考地址

源码可参考https://github.com/tillrohrmann/cep-monitoring.git,我这里对其进行了简单的修改。

本文地址:https://blog.csdn.net/wowSpark/article/details/110142419

相关标签: Flink flinkcep