Flink的Session Window的简单使用
程序员文章站
2022-03-14 22:49:27
...
Flink的Session Window的简单使用
一、Session Window的概念
二、业务需求
本文的需求是:
通过flink处理用户的行为日志,如果该用户有超过1分钟的时间没有日志记录,将其userId输出。
三、处理步骤
1、使用python模拟用户的行为日志,往kafka中发送数据。
2、使用Flink的stream处理kafka的数据,设置session window间隙时间(Gap time),比如1min,将用户1分钟后的没有日志行为用户找出。
注意:session window基于event time做为水印处理时,是需要等待下一条数据过来才会去触发。如果数据迟迟不来,哪怕是超过1min的时间,依然不会把用户输出。因为不能用机器的时间来做判断。因为基于event time的事件会因为各种原因延迟。
3.1 python脚本模拟用户日志写入kafka product
import datetime
import random
import time
from kafka import KafkaProducer
if __name__ == '__main__':
user_ids = [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015]
pages = ["1.html", "2.html", "3.html", "4.html", "5.html", "6.html", "7.html", "8.html", "9.html", "10.html"]
p = KafkaProducer(bootstrap_servers="10.105.18.175:9092")
while True:
index = random.randint(0, len(user_ids) - 1)
user_id = user_ids[index]
page_index = random.randint(0, len(pages) - 1)
page = pages[page_index]
time.sleep(index * 2)
event_time = (datetime.datetime.now() - datetime.timedelta(seconds=10)).strftime("%Y-%m-%d %H:%M:%S")
v = '{"userId":%s,"page":"%s","action":"view","userActionTime":"%s"}' % (user_id, page, event_time)
print(v)
p.send("user_view_log", bytes(v, encoding="utf-8"))
p.flush()
3.2 Flink Stream程序,通过Session Window的方式处理
3.2.1 pom.xml文件的依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
3.2.2 Flink Stream代码
public class WindowUserAction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 设置并行度
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置流的时间特征,使用event time
Properties p = new Properties();
p.setProperty("bootstrap.servers", "10.105.18.175:9092");
p.setProperty("group.id", "test");
// add source kafka
DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer09<>("user_view_log", new SimpleStringSchema(), p));
ds.print();
ds.map((MapFunction<String, UserAction>) value -> new Gson().fromJson(value, UserAction.class))
// 添加水印,使用event time的时间作为水印
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserAction>() {// AscendingTimestampExtractor 水印时间升序,默认log模式记录,可以自定义,或设置ignore模式或fail模式
@Override
public long extractAscendingTimestamp(UserAction element) {
try {
Date date = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").parse(element.getUserActionTime());
return date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
})
.map(new MapFunction<UserAction, Tuple3<Integer, String, Integer>>() {
@Override
public Tuple3<Integer, String, Integer> map(UserAction value) throws Exception {
return new Tuple3<>(value.getUserId(), value.getUserActionTime(), 1);
}
})
.keyBy(new KeySelector<Tuple3<Integer, String, Integer>, Integer>() {
@Override
public Integer getKey(Tuple3<Integer, String, Integer> value) throws Exception {
return value.f0;
}
})
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))// 设置session windows会话窗口,Gap time会话时间设置为1min,关于session window的概念见官方文档
.allowedLateness(Time.seconds(10))// 允许延迟的时间,比如,上一条数据是 10:00:00,后面来了一条09:59:50的数据,仍然会在window中计算
.reduce((ReduceFunction<Tuple3<Integer, String, Integer>>) (value1, value2) -> new Tuple3<>(value1.f0, value1.f1, value1.f2 + value2.f2))
.filter((FilterFunction<Tuple3<Integer, String, Integer>>) value -> value.f2 < 2)
.print();
// 调用execute开始执行
env.execute("WindowUserAction");
}
}
3.2.3 UserAction类
ublic class UserAction implements Serializable {
private int userId;
private String page;
private String action;
private String userActionTime;
//get set 方法省略
}
分别执行上述代码,观察输出的日志即可。
上一篇: C#中利用MD5实现密码加密实例
下一篇: Java的md5加密实例