flink-table-sql-demo1
程序员文章站
2022-06-07 23:12:11
...
一.背景
flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。
用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。
二.直接看代码
// lombok 插件,这里主要写一个简单的数据产生的对象 // 表是时间,用户,以及商品3个字段 @Data @ToString public class UserInfo implements Serializable { private Timestamp pTime; private String userId; private String itemId; public UserInfo() { } public UserInfo(String userId, String itemId) { this.userId = userId; this.itemId = itemId; this.pTime = new Timestamp(System.currentTimeMillis()); } }
/** * 模拟数据产生,没隔1秒发送一个数据 * @author <a href="mailto:huoguo@2dfire.com">火锅</a> * @time 2019/2/22 */ public class UserDataSource implements SourceFunction<UserInfo> { static String[] items = {"i-1", "i-2", "i-3"}; static String[] users = {"a", "b", "c"}; @Override public void run(SourceContext sc) throws Exception { while (true) { TimeUnit.SECONDS.sleep(1); int m = (int) (System.currentTimeMillis() % 3); sc.collect(new UserInfo(users[m], items[m])); } } @Override public void cancel() { System.out.println("cancel to do ..."); } }
/** * 这就主函数,负责统计,引用是java的,别引错了 */ public class UserApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource()); DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() { @Override public long extractAscendingTimestamp(UserInfo element) { return element.getPTime().getTime(); } }); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // pTime.rowtime = pTime as rowTime(proctime) tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime"); Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); } }
结果:
c,2019-03-06 13:55:20.0,4
a,2019-03-06 13:55:20.0,1
--- 手动分开好看
c,2019-03-06 13:55:25.0,2
b,2019-03-06 13:55:25.0,2
a,2019-03-06 13:55:25.0,1
---
a,2019-03-06 13:55:30.0,2
c,2019-03-06 13:55:30.0,2
b,2019-03-06 13:55:30.0,1
小结:
1.demo很简单,仅为了测试使用
2.具体的原理,和很多东西 有时间再写吧