欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

40岁程序员学Flink, 模拟事件处理

程序员文章站 2022-04-24 12:39:30
...

一,背景

      上文已经介绍了flink如何与kafka对接,本文在上文的基础上实现利用flink进行事件监控。模拟场景,有很多机器,不停的把机器的温度发送到kafka,发送消息格式 "机器名=温度",例如 "xyz=30", flink监控该事件,并检查温度是否大于某个值,如果大于则输出提示信息。

 

二,准备条件

在window 启动kafka,具体见上文,地址如下:

https://blog.csdn.net/kanganrui/article/details/88635912

 

三,编码

1)在IDEA新见maven项目,增加相关依赖,具体如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flinkstudy</groupId>
    <artifactId>flinkcep</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.7.2</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

 

2)编写业务实体对象,具体如下:

package flinkstudy.application;

/***
 * 通知对象,用于最后保存flink汇总结果
 */
public class Alert {

    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Alert(String message) {
        super();
        this.message = message;
    }

    @Override
    public String toString() {
        return "Alert [message=" + message + "]";
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((message == null) ? 0 : message.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Alert other = (Alert) obj;
        if (message == null) {
            if (other.message != null)
                return false;
        } else if (!message.equals(other.message))
            return false;
        return true;
    }
}

 

package flinkstudy.application;


/***
 * 监控事件抽象类
 */
public abstract class MonitoringEvent {

    private String machineName;

    public String getMachineName() {
        return machineName;
    }

    public void setMachineName(String machineName) {
        this.machineName = machineName;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        MonitoringEvent other = (MonitoringEvent) obj;
        if (machineName == null) {
            if (other.machineName != null)
                return false;
        } else if (!machineName.equals(other.machineName))
            return false;
        return true;
    }

    public MonitoringEvent(String machineName) {
        super();
        this.machineName = machineName;
    }

}

package flinkstudy.application;


/***
 * 温度监控事件
 */
public class TemperatureEvent extends MonitoringEvent {

    public TemperatureEvent(String machineName) {
        super(machineName);
    }

    private double temperature;

    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = super.hashCode();
        long temp;
        temp = Double.doubleToLongBits(temperature);
        result = prime * result + (int) (temp ^ (temp >>> 32));
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (!super.equals(obj))
            return false;
        if (getClass() != obj.getClass())
            return false;
        TemperatureEvent other = (TemperatureEvent) obj;
        if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))
            return false;
        return true;
    }

    public TemperatureEvent(String machineName, double temperature) {
        super(machineName);
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()
                + "]";
    }

}

 

注意,这里一定要定义 equals方法和 hashCode,因为flink会调用这两个方法。

3)编写反序列化器,实现把消息反序列成TemperatureEvent对象,具体如下:

package flinkstudy.application;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;

/***
 * 把收到的信息发序列化成对象
 */
public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {

    public TypeInformation<TemperatureEvent> getProducedType() {
        return TypeExtractor.getForClass(TemperatureEvent.class);
    }

    public TemperatureEvent deserialize(byte[] arg0) throws IOException {
        String str = new String(arg0, StandardCharsets.UTF_8);

        String[] parts = str.split("=");
        return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));
    }

    public boolean isEndOfStream(TemperatureEvent arg0) {
        return false;
    }

}

 

4)编写程序入口,实现事件监控

package flinkstudy.application;


import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.List;
import java.util.Map;
import java.util.Properties;

public class FlinkCEPJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        DataStream<TemperatureEvent> inputEventStream = env.addSource(
                new FlinkKafkaConsumer<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));

        /***
         * 定义过滤条件
         */
        Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
                .subtype(TemperatureEvent.class).where(
                   new IterativeCondition<TemperatureEvent>() {

                       @Override
                       public boolean filter(TemperatureEvent temperatureEvent, Context<TemperatureEvent> context) throws Exception {
                           if (temperatureEvent.getTemperature() >= 26.0) {
                               return true;
                           }
                           return false;
                       }
                    }
                ).within(Time.seconds(10));


        /**
         * 对过滤后的条件进行结果汇总
         *
         */
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
                .select(new PatternSelectFunction<TemperatureEvent, Alert>() {
                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                        return new Alert("Temperature Rise Detected:" + event.get("first").size()+" times");
                    }
                });

        patternStream.print();
        env.execute("CEP on Temperature Sensor");


    }
}

 

四 测试

1)在cmd中进入kafka安装目录下面的window目录,运行如下命令建立topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic temperature

2) 相同的cmd中运行如下命令,打开消息输入界面,模拟输入消息

kafka-console-producer.bat --broker-list localhost:9092 --topic temperature

最终结果如下图:

40岁程序员学Flink, 模拟事件处理

3)运行java程序,最终结果如下图

40岁程序员学Flink, 模拟事件处理可以看到程序检测到温度升高事件,并输出提示。

 

相关源码:https://gitee.com/kengan/40_programmers_learn_flink/tree/master/study3