欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink DataStream KeyedProcessFunction实现类似Session Window功能

程序员文章站 2024-03-19 22:31:34
...

Flink DataStream KeyedProcessFunction实现类似Session Window功能

一、KeyedProcessFunction功能介绍

KeyProcessFunction是一个低级的Stream处理操作(Operator),可以更加灵活的处理Stream的业务逻辑,它可以访问所有流应用程序的基本构建块:

  • 事件(Event)
  • 状态(State,仅支持KeyedStream)
  • 计时器(支持EventTimeTimer(事件时间定时器)和ProcessingTimeTimer(处理时间定时器),仅支持KeyedStream)

KeyProcessFunction的方法介绍:

  • processElement(I value, Context ctx, Collector<O> out)
    • 参数1:每个Element元素对象
    • 参数2:为KeyedProcessFunction中定义的上下文对象
    • 参数3:收集器
  • onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
    • 参数1:触发定时器的时间
    • 参数2:定时器的上下文
    • 参数3:收集器,一般收集定时器触发后的数据。

二、KeyedProcessFunction示例

我们有这样一个需求:需要获取用户15分钟后未浏览商品页面,需要将这个用户捞出,给这些用户推送一些优惠商品。

这个场景,可以使用Session Window基于Event Time来做,但是有个问题,如果基于Event Time,在下一条水印没来到,不会触发当前定时器,假设在凌晨2点到4点,没有用户的日志行为,那么在凌晨1点45分到2点之间的符合条件的用户,就不会捞出。这个问题我们后来采用了ProcessingTimeTrigger来进行触发计算。也请教过一些大神,说可以通过KeyProcessFunction实现。

下面看示例代码,使用KeyedProcessFunction实现类似Session Window的功能。

ProcessFunctionTest类,Stream程序实现逻辑

public class ProcessFunctionTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 基于Event Time计算
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        Properties p = new Properties();
        p.setProperty("bootstrap.servers", "localhost:9092");
        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("user_view_log", new SimpleStringSchema(), p));
        ds.print();

        ds
                .map(new MapFunction<String, UserAction>() {
                    @Override
                    public UserAction map(String value) throws Exception {
                        return new Gson().fromJson(value, UserAction.class);
                    }
                })
            	// 生成水印
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserAction>() {
                    @Override
                    public long extractAscendingTimestamp(UserAction element) {
                        try {
                            return element.getUserActionTime();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return 0;
                    }
                })
                .keyBy(new KeySelector<UserAction, Integer>() {
                    @Override
                    public Integer getKey(UserAction value) throws Exception {
                        return value.getUserId();
                    }
                })
                // CountWithTimeoutEventTimeFunction:注册的是EventTime注册器
                // CountWithTimeoutProcessingTimeFunction:注册的是ProcessingTime注册器
                .process(new CountWithTimeoutProcessingTimeFunction())//使用process方法计算
                .print();

        env.execute("ProcessFunctionTest");
    }
}

CountWithTimeoutProcessingTimeFunction类,继承KeyedProcessFunction,使用Processing Time的定时器,可以解决上面没有事件进入的问题。

public class CountWithTimeoutProcessingTimeFunction
        extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {

    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            UserAction value,
            Context ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp current = state.value();

        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.getUserId();
        }

        current.count++;
        current.lastModified = ctx.timestamp();

        state.update(current);
		// 注册ProcessingTime处理时间的定时器
        ctx.timerService().registerProcessingTimeTimer(current.lastModified + 60000L);
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp result = state.value();

        if (timestamp == result.lastModified + 60000L) {
            out.collect(new Tuple2<Integer, Long>(result.key, result.count));
            state.update(null);
            ctx.timerService().deleteEventTimeTimer(timestamp);
        }
    }
}

CountWithTimeoutEventTimeFunction类,继承KeyedProcessFunction,使用的是Event Time的定时器。类似使用Session Window的EventTimeTrigger。这个需要下一条水印来到,才会触发定时器的。

/**
 * registerEventTimeTimer 是基于Event Time注册定时器,触发时需要下一条数据的水印作对比
 * 需要查看相差的时间是否大于等于interval,大于等于interval则触发,因为已经到时间了,小于interval,不然不会触发定时器,因为没到指定的时间
 * 如果下一条水印不来,就没办法对比时间间隔是否大于interval,也就不会触发
 * <p>
 * 与registerProcessingTimeTimer不同,这个是与机器时间做比对,所以一到interval,就会进行触发操作
 */
public class CountWithTimeoutEventTimeFunction
        extends KeyedProcessFunction<Integer, UserAction, Tuple2<Integer, Long>> {

    private ValueState<CountWithTimestamp> state;
    private final long interval = 60000L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            UserAction value,
            Context ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp current = state.value();

        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.getUserId();
        }

        current.count++;
        current.lastModified = ctx.timestamp();

        state.update(current);
		// 注册EventTime处理时间的定时器
        ctx.timerService().registerEventTimeTimer(current.lastModified + interval);
    }

    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<Integer, Long>> out) throws Exception {

        CountWithTimestamp result = state.value();

        if (timestamp == result.lastModified + interval) {
            out.collect(new Tuple2<Integer, Long>(result.key, result.count));
            state.update(null);
            ctx.timerService().deleteEventTimeTimer(timestamp);
        }
    }
}

CountWithTimestamp类,JavaBean

public class CountWithTimestamp {
    public Integer key;
    public long count;
    public long lastModified;

    @Override
    public String toString() {
        super.toString();
    }
}

UserAction类,JavaBean

public class UserAction implements Serializable {
    private int userId;
    private String page;
    private String action;
    private long userActionTime;
    
    // 省略get set方法
}