40岁程序员学Flink, 模拟事件处理
一,背景
上文已经介绍了flink如何与kafka对接,本文在上文的基础上实现利用flink进行事件监控。模拟场景,有很多机器,不停的把机器的温度发送到kafka,发送消息格式 "机器名=温度",例如 "xyz=30", flink监控该事件,并检查温度是否大于某个值,如果大于则输出提示信息。
二,准备条件
在window 启动kafka,具体见上文,地址如下:
https://blog.csdn.net/kanganrui/article/details/88635912
三,编码
1)在IDEA新见maven项目,增加相关依赖,具体如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>flinkstudy</groupId>
<artifactId>flinkcep</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.7.2</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
2)编写业务实体对象,具体如下:
package flinkstudy.application;
/***
* 通知对象,用于最后保存flink汇总结果
*/
public class Alert {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Alert(String message) {
super();
this.message = message;
}
@Override
public String toString() {
return "Alert [message=" + message + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((message == null) ? 0 : message.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Alert other = (Alert) obj;
if (message == null) {
if (other.message != null)
return false;
} else if (!message.equals(other.message))
return false;
return true;
}
}
package flinkstudy.application;
/***
* 监控事件抽象类
*/
public abstract class MonitoringEvent {
private String machineName;
public String getMachineName() {
return machineName;
}
public void setMachineName(String machineName) {
this.machineName = machineName;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MonitoringEvent other = (MonitoringEvent) obj;
if (machineName == null) {
if (other.machineName != null)
return false;
} else if (!machineName.equals(other.machineName))
return false;
return true;
}
public MonitoringEvent(String machineName) {
super();
this.machineName = machineName;
}
}
package flinkstudy.application;
/***
* 温度监控事件
*/
public class TemperatureEvent extends MonitoringEvent {
public TemperatureEvent(String machineName) {
super(machineName);
}
private double temperature;
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
long temp;
temp = Double.doubleToLongBits(temperature);
result = prime * result + (int) (temp ^ (temp >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
TemperatureEvent other = (TemperatureEvent) obj;
if (Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature))
return false;
return true;
}
public TemperatureEvent(String machineName, double temperature) {
super(machineName);
this.temperature = temperature;
}
@Override
public String toString() {
return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()
+ "]";
}
}
注意,这里一定要定义 equals方法和 hashCode,因为flink会调用这两个方法。
3)编写反序列化器,实现把消息反序列成TemperatureEvent对象,具体如下:
package flinkstudy.application;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
/***
* 把收到的信息发序列化成对象
*/
public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {
public TypeInformation<TemperatureEvent> getProducedType() {
return TypeExtractor.getForClass(TemperatureEvent.class);
}
public TemperatureEvent deserialize(byte[] arg0) throws IOException {
String str = new String(arg0, StandardCharsets.UTF_8);
String[] parts = str.split("=");
return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));
}
public boolean isEndOfStream(TemperatureEvent arg0) {
return false;
}
}
4)编写程序入口,实现事件监控
package flinkstudy.application;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class FlinkCEPJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<TemperatureEvent> inputEventStream = env.addSource(
new FlinkKafkaConsumer<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));
/***
* 定义过滤条件
*/
Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
.subtype(TemperatureEvent.class).where(
new IterativeCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent temperatureEvent, Context<TemperatureEvent> context) throws Exception {
if (temperatureEvent.getTemperature() >= 26.0) {
return true;
}
return false;
}
}
).within(Time.seconds(10));
/**
* 对过滤后的条件进行结果汇总
*
*/
DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
@Override
public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected:" + event.get("first").size()+" times");
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
}
}
四 测试
1)在cmd中进入kafka安装目录下面的window目录,运行如下命令建立topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic temperature
2) 相同的cmd中运行如下命令,打开消息输入界面,模拟输入消息
kafka-console-producer.bat --broker-list localhost:9092 --topic temperature
最终结果如下图:
3)运行java程序,最终结果如下图
可以看到程序检测到温度升高事件,并输出提示。
相关源码:https://gitee.com/kengan/40_programmers_learn_flink/tree/master/study3
推荐阅读