flink interval join 不同时间窗口计算结果相同问题
程序员文章站
2024-03-07 13:26:27
...
问题描述:
interval join 10分钟窗口和20分钟窗口数据相同
需求:
一条消费数据流,主要字段card_num 和amount
需要输出:
card_num,
10分钟内消费的总金额,
20分钟内消费的总金额
不同窗口得到的结果是一样的,自会按照一种窗口的大小计算。
解决方法:
10分钟一个窗口输出到kafka_table_1
20分钟一个窗口输出到kafka_table_2
然后kafka_table_1再和kafka_table_2 interval join
下面是一个模板
-- source表
CREATE TABLE source_table_9 (
card_num STRING,
ts BIGINT,
mcount BIGINT NOT NULL,
avgamount BIGINT,
et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR et AS et - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'source_table_9',
'properties.group.id'='dev_flink',
'properties.zookeeper.connect'='xxxxx:2181',
'properties.bootstrap.servers' = 'xxxxx:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE source_table_8 (
card_num STRING,
ts BIGINT,
mcount BIGINT NOT NULL,
avgamount BIGINT,
et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR et AS et - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kafka',
'topic' = 'source_table_8',
'properties.group.id'='dev_flink',
'properties.zookeeper.connect'='xxxxx:2181',
'properties.bootstrap.servers' = 'xxxxx:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 10s的数据插入kafka
insert into source_table_8
select
card_num,
ts,
count(txn_cd) over w as mcount,
avg(amount) over w as avgamount
from source_table_1
window w as (partition by card_num order by et range between interval '10' second preceding and current row);
-- 20s的数据插入kafka
insert into source_table_9
select
card_num,
ts,
count(txn_cd) over w as mcount,
avg(amount) over w as avgamount
from source_table_1
window w as (partition by card_num order by et range between interval '20' second preceding and current row);
-- 10s和20s join
create view mid_table_3 as
select
c.card_num as card_num,
c.et as et,
c.mcount as ten_sec_amount,
d.mcount as thirty_sec_amount
from source_table_8 d,source_table_9 c
where c.card_num=d.card_num
and c.et BETWEEN d.et - INTERVAL '2' second AND d.et + INTERVAL '2' second;
问题思考:
为什么不能用两个view直接interval join?
可能在interval join 的时候一条流只会有一个keyedstate,所以即使是两个不同的view,但是他们是一条流。所以会得到的计算结果相同。(没有阅读源码,只是目前的想法。希望大佬指点一下!)