Flink DataStream KeyedProcessFunction实现类似Session Window功能
Flink DataStream KeyedProcessFunction实现类似Session Window功能
一、KeyedProcessFunction功能介绍
KeyProcessFunction是一个低级的Stream处理操作(Operator),可以更加灵活的处理Stream的业务逻辑,它可以访问所有流应用程序的基本构建块:
- 事件(Event)
- 状态(State,仅支持KeyedStream)
- 计时器(支持EventTimeTimer(事件时间定时器)和ProcessingTimeTimer(处理时间定时器),仅支持KeyedStream)
KeyProcessFunction的方法介绍:
-
processElement(I value, Context ctx, Collector<O> out)
- 参数1:每个Element元素对象
- 参数2:为KeyedProcessFunction中定义的上下文对象
- 参数3:收集器
-
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
- 参数1:触发定时器的时间
- 参数2:定时器的上下文
- 参数3:收集器,一般收集定时器触发后的数据。
二、KeyedProcessFunction示例
我们有这样一个需求:需要获取用户15分钟后未浏览商品页面,需要将这个用户捞出,给这些用户推送一些优惠商品。
这个场景,可以使用Session Window
基于Event Time来做,但是有个问题,如果基于Event Time,在下一条水印没来到,不会触发当前定时器,假设在凌晨2点到4点,没有用户的日志行为,那么在凌晨1点45分到2点之间的符合条件的用户,就不会捞出。这个问题我们后来采用了ProcessingTimeTrigger
来进行触发计算。也请教过一些大神,说可以通过KeyProcessFunction
实现。
下面看示例代码,使用KeyedProcessFunction
实现类似Session Window
的功能。
ProcessFunctionTest类,Stream程序实现逻辑
public class ProcessFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 基于Event Time计算
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
Properties p = new Properties();
p.setProperty("bootstrap.servers", "localhost:9092");
DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("user_view_log", new SimpleStringSchema(), p));
ds.print();
ds
.map(new MapFunction<String, UserAction>() {
@Override
public UserAction map(String value) throws Exception {
return new Gson().fromJson(value, UserAction.class);
}
})
// 生成水印
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserAction>() {
@Override
public long extractAscendingTimestamp(UserAction element) {
try {
return element.getUserActionTime();
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
})
.keyBy(new KeySelector<UserAction, Integer>() {
@Override
public Integer getKey(UserAction value) throws Exception {
return value.getUserId();
}
})
// CountWithTimeoutEventTimeFunction:注册的是EventTime注册器
// CountWithTimeoutProcessingTimeFunction:注册的是ProcessingTime注册器
.process(new CountWithTimeoutProcessingTimeFunction())//使用process方法计算
.print();
env.execute("ProcessFunctionTest");
}
}
CountWithTimeoutProcessingTimeFunction类,继承KeyedProcessFunction,使用Processing Time的定时器,可以解决上面没有事件进入的问题。
public class CountWithTimeoutProcessingTimeFunction
extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
UserAction value,
Context ctx,
Collector<Tuple2<Integer, Long>> out) throws Exception {
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.getUserId();
}
current.count++;
current.lastModified = ctx.timestamp();
state.update(current);
// 注册ProcessingTime处理时间的定时器
ctx.timerService().registerProcessingTimeTimer(current.lastModified + 60000L);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<Integer, Long>> out) throws Exception {
CountWithTimestamp result = state.value();
if (timestamp == result.lastModified + 60000L) {
out.collect(new Tuple2<Integer, Long>(result.key, result.count));
state.update(null);
ctx.timerService().deleteEventTimeTimer(timestamp);
}
}
}
CountWithTimeoutEventTimeFunction类,继承KeyedProcessFunction,使用的是Event Time的定时器。类似使用Session Window的EventTimeTrigger。这个需要下一条水印来到,才会触发定时器的。
/**
* registerEventTimeTimer 是基于Event Time注册定时器,触发时需要下一条数据的水印作对比
* 需要查看相差的时间是否大于等于interval,大于等于interval则触发,因为已经到时间了,小于interval,不然不会触发定时器,因为没到指定的时间
* 如果下一条水印不来,就没办法对比时间间隔是否大于interval,也就不会触发
* <p>
* 与registerProcessingTimeTimer不同,这个是与机器时间做比对,所以一到interval,就会进行触发操作
*/
public class CountWithTimeoutEventTimeFunction
extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {
private ValueState<CountWithTimestamp> state;
private final long interval = 60000L;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
UserAction value,
Context ctx,
Collector<Tuple2<Integer, Long>> out) throws Exception {
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.getUserId();
}
current.count++;
current.lastModified = ctx.timestamp();
state.update(current);
// 注册EventTime处理时间的定时器
ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<Integer, Long>> out) throws Exception {
CountWithTimestamp result = state.value();
if (timestamp == result.lastModified + interval) {
out.collect(new Tuple2<Integer, Long>(result.key, result.count));
state.update(null);
ctx.timerService().deleteEventTimeTimer(timestamp);
}
}
}
CountWithTimestamp类,JavaBean
public class CountWithTimestamp {
public Integer key;
public long count;
public long lastModified;
@Override
public String toString() {
super.toString();
}
}
UserAction类,JavaBean
public class UserAction implements Serializable {
private int userId;
private String page;
private String action;
private long userActionTime;
// 省略get set方法
}
上一篇: 如何使用Cookie来保存用户的登录信息
下一篇: re模块的简单使用