欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink的Session Window的简单使用

程序员文章站 2022-03-14 22:49:27
...

Flink的Session Window的简单使用

一、Session Window的概念

参见官方文档

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows


二、业务需求

本文的需求是:

通过flink处理用户的行为日志,如果该用户有超过1分钟的时间没有日志记录,将其userId输出。

三、处理步骤

1、使用python模拟用户的行为日志,往kafka中发送数据。

2、使用Flink的stream处理kafka的数据,设置session window间隙时间(Gap time),比如1min,将用户1分钟后的没有日志行为用户找出。

注意:session window基于event time做为水印处理时,是需要等待下一条数据过来才会去触发。如果数据迟迟不来,哪怕是超过1min的时间,依然不会把用户输出。因为不能用机器的时间来做判断。因为基于event time的事件会因为各种原因延迟。


3.1 python脚本模拟用户日志写入kafka product

import datetime
import random
import time

from kafka import KafkaProducer

if __name__ == '__main__':

    user_ids = [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015]
    pages = ["1.html", "2.html", "3.html", "4.html", "5.html", "6.html", "7.html", "8.html", "9.html", "10.html"]
    p = KafkaProducer(bootstrap_servers="10.105.18.175:9092")
    while True:
        index = random.randint(0, len(user_ids) - 1)
        user_id = user_ids[index]
        page_index = random.randint(0, len(pages) - 1)
        page = pages[page_index]
        time.sleep(index * 2)
        event_time = (datetime.datetime.now() - datetime.timedelta(seconds=10)).strftime("%Y-%m-%d %H:%M:%S")
        v = '{"userId":%s,"page":"%s","action":"view","userActionTime":"%s"}' % (user_id, page, event_time)
        print(v)
        p.send("user_view_log", bytes(v, encoding="utf-8"))
        p.flush()

3.2 Flink Stream程序,通过Session Window的方式处理

3.2.1 pom.xml文件的依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

    </dependencies>

3.2.2 Flink Stream代码

public class WindowUserAction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 设置并行度
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置流的时间特征,使用event time

        Properties p = new Properties();
        p.setProperty("bootstrap.servers", "10.105.18.175:9092");
        p.setProperty("group.id", "test");
        // add source kafka
        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer09<>("user_view_log", new SimpleStringSchema(), p));
        ds.print();

        ds.map((MapFunction<String, UserAction>) value -> new Gson().fromJson(value, UserAction.class))
                // 添加水印,使用event time的时间作为水印
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserAction>() {// AscendingTimestampExtractor 水印时间升序,默认log模式记录,可以自定义,或设置ignore模式或fail模式
                    @Override
                    public long extractAscendingTimestamp(UserAction element) {
                        try {
                            Date date = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").parse(element.getUserActionTime());
                            return date.getTime();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                        return 0;
                    }
                })
                .map(new MapFunction<UserAction, Tuple3<Integer, String, Integer>>() {
                    @Override
                    public Tuple3<Integer, String, Integer> map(UserAction value) throws Exception {
                        return new Tuple3<>(value.getUserId(), value.getUserActionTime(), 1);
                    }
                })
                .keyBy(new KeySelector<Tuple3<Integer, String, Integer>, Integer>() {
                    @Override
                    public Integer getKey(Tuple3<Integer, String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .window(EventTimeSessionWindows.withGap(Time.minutes(1)))// 设置session windows会话窗口,Gap time会话时间设置为1min,关于session window的概念见官方文档
                .allowedLateness(Time.seconds(10))// 允许延迟的时间,比如,上一条数据是 10:00:00,后面来了一条09:59:50的数据,仍然会在window中计算
                .reduce((ReduceFunction<Tuple3<Integer, String, Integer>>) (value1, value2) -> new Tuple3<>(value1.f0, value1.f1, value1.f2 + value2.f2))
                .filter((FilterFunction<Tuple3<Integer, String, Integer>>) value -> value.f2 < 2)
                .print();

        // 调用execute开始执行
        env.execute("WindowUserAction");
    }
}

3.2.3 UserAction类

ublic class UserAction implements Serializable {
    private int userId;
    private String page;
    private String action;
    private String userActionTime;
    
    //get set 方法省略
}

分别执行上述代码,观察输出的日志即可。