欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink-table-sql-demo1

程序员文章站 2022-06-07 22:58:01
...

一.背景

     flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。

用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

 

二.直接看代码

 

// lombok 插件,这里主要写一个简单的数据产生的对象
// 表是时间,用户,以及商品3个字段
@Data
@ToString
public class UserInfo implements Serializable {
    private Timestamp pTime;
    private String userId;
    private String itemId;

    public UserInfo() {
    }

    public UserInfo(String userId, String itemId) {
        this.userId = userId;
        this.itemId = itemId;
        this.pTime = new Timestamp(System.currentTimeMillis());
    }
}

 

/**
 * 模拟数据产生,没隔1秒发送一个数据
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 2019/2/22
 */
public class UserDataSource implements SourceFunction<UserInfo> {
    static String[] items = {"i-1", "i-2", "i-3"};
    static String[] users = {"a", "b", "c"};
    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % 3);
            sc.collect(new UserInfo(users[m], items[m]));
        }
    }
    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }
}

 

 

/**
 * 这就主函数,负责统计,引用是java的,别引错了
 */
public class UserApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource());

        DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() {
            @Override
            public long extractAscendingTimestamp(UserInfo element) {
                return element.getPTime().getTime();
            }
        });
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        // pTime.rowtime  = pTime as rowTime(proctime)
        tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

 

结果:

c,2019-03-06 13:55:20.0,4

 

a,2019-03-06 13:55:20.0,1

--- 手动分开好看

c,2019-03-06 13:55:25.0,2

b,2019-03-06 13:55:25.0,2

a,2019-03-06 13:55:25.0,1

---

a,2019-03-06 13:55:30.0,2

c,2019-03-06 13:55:30.0,2

b,2019-03-06 13:55:30.0,1

 

小结:

        1.demo很简单,仅为了测试使用

        2.具体的原理,和很多东西 有时间再写吧