欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink interval join 不同时间窗口计算结果相同问题

程序员文章站 2024-03-07 13:26:27
...

问题描述:

interval join 10分钟窗口和20分钟窗口数据相同

需求:

一条消费数据流,主要字段card_num 和amount

需要输出:

card_num,

10分钟内消费的总金额,

20分钟内消费的总金额

不同窗口得到的结果是一样的,自会按照一种窗口的大小计算。
flink interval join 不同时间窗口计算结果相同问题

解决方法:
10分钟一个窗口输出到kafka_table_1
20分钟一个窗口输出到kafka_table_2
然后kafka_table_1再和kafka_table_2 interval join

下面是一个模板

-- source表
CREATE TABLE source_table_9 (
      card_num STRING,
      ts BIGINT,
      mcount BIGINT NOT NULL,
      avgamount  BIGINT,
	  et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
      WATERMARK FOR et AS et - INTERVAL '5' SECOND
    )
    WITH (
      'connector' = 'kafka',
      'topic' = 'source_table_9',
      'properties.group.id'='dev_flink',
      'properties.zookeeper.connect'='xxxxx:2181',
      'properties.bootstrap.servers' = 'xxxxx:9092',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
      );
	  
CREATE TABLE source_table_8 (
      card_num STRING,
      ts BIGINT,
      mcount BIGINT NOT NULL,
      avgamount  BIGINT,
	  et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')),
      WATERMARK FOR et AS et - INTERVAL '5' SECOND
    )
    WITH (
      'connector' = 'kafka',
      'topic' = 'source_table_8',
      'properties.group.id'='dev_flink',
      'properties.zookeeper.connect'='xxxxx:2181',
      'properties.bootstrap.servers' = 'xxxxx:9092',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
      );
	  
-- 10s的数据插入kafka
insert into source_table_8
select
    card_num,
    ts,
    count(txn_cd) over w                               as  mcount,
    avg(amount) over w                               as  avgamount
from source_table_1
window w as (partition by card_num order by et range between interval '10' second preceding and current row);


-- 20s的数据插入kafka
insert into source_table_9
select
    card_num,
    ts,
    count(txn_cd) over w                               as  mcount,
    avg(amount) over w                               as  avgamount
from source_table_1
window w as (partition by card_num order by et range between interval '20' second preceding and current row);


-- 10s和20s join
create view mid_table_3 as
select
    c.card_num as card_num,
    c.et as et,
    c.mcount as ten_sec_amount,
    d.mcount as thirty_sec_amount
from source_table_8 d,source_table_9 c
where c.card_num=d.card_num
and c.et BETWEEN d.et - INTERVAL '2' second AND d.et + INTERVAL '2' second;

问题思考:
为什么不能用两个view直接interval join?
可能在interval join 的时候一条流只会有一个keyedstate,所以即使是两个不同的view,但是他们是一条流。所以会得到的计算结果相同。(没有阅读源码,只是目前的想法。希望大佬指点一下!)

相关标签: flink interval join