欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

可能是史上覆盖flinksql功能最全的demo--part2

程序员文章站 2022-06-19 11:24:04
...

接上一篇文章可能是史上覆盖flinksql功能最全的demo–part1

Flink SQL join Table的5种方式

静态表常规join

静态表常规join指的是:静态表join静态表

例:按地区和优先级显示特定日期的客户及其订单

-- 订单表dev_orders(基于S3的静态表) join MySQL表
SET execution.type=batch;
USE CATALOG hive;
SELECT
  r_name AS `region`,
  o_orderpriority AS `priority`,
  COUNT(DISTINCT c_custkey) AS `number_of_customers`,
  COUNT(o_orderkey) AS `number_of_orders`
FROM dev_orders
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
WHERE
  FLOOR(o_ordertime TO DAY) = TIMESTAMP '2020-04-01 0:00:00.000'
  AND NOT o_orderpriority = '4-NOT SPECIFIED'
GROUP BY r_name, o_orderpriority
ORDER BY r_name, o_orderpriority;

可能是史上覆盖flinksql功能最全的demo--part2

动态表常规join

动态表常规join指的是:动态表join静态表

例:将上例中的静态订单表改为动态表,查询相同也的业务逻辑

-- 将静态订单表dev_orders改为动态订单表prod_orders,移除ORDER BY子句(流处理引擎不支持)
SET execution.type=streaming;
USE CATALOG hive;
SELECT
  r_name AS `region`,
  o_orderpriority AS `priority`,
  COUNT(DISTINCT c_custkey) AS `number_of_customers`,
  COUNT(o_orderkey) AS `number_of_orders`
FROM default_catalog.default_database.prod_orders
JOIN prod_customer ON o_custkey = c_custkey
JOIN prod_nation ON c_nationkey = n_nationkey
JOIN prod_region ON n_regionkey = r_regionkey
WHERE
  FLOOR(o_ordertime TO DAY) = TIMESTAMP '2020-04-01 0:00:00.000'
  AND NOT o_orderpriority = '4-NOT SPECIFIED'
GROUP BY r_name, o_orderpriority;

注意:

  1. 静态表只会在任务启动时加载一次,数据更新后无法反馈到已经启动的任务中
  2. 所有输入表的数据都会被flink写到状态中

时间区间join(Interval Join)

时间区间join通常用于类似需求:将两个(或多个)动态表的事件进行join,这些动态表在一个时间上下文中相互关联,例如在同一时间发生的事件。Flink SQL对这种连接进行了特殊的优化。

例:将子订单表和订单表进行关联,找到紧急状态的未付款子订单

USE CATALOG default_catalog;
SELECT
  o_ordertime AS `ordertime`,
  o_orderkey AS `order`,
  l_linenumber AS `linenumber`,
  l_partkey AS `part`,
  l_suppkey AS `supplier`,
  l_quantity AS `quantity`
FROM prod_lineitem
JOIN prod_orders ON o_orderkey = l_orderkey
WHERE
  l_ordertime BETWEEN o_ordertime - INTERVAL '5' MINUTE AND o_ordertime AND
  l_linestatus = 'O' AND
  o_orderpriority = '1-URGENT';

注意:

  1. where条件中左表和右表必须有基于Event-time语义或Processin-time语义的关联条件,本例中为:
l_ordertime BETWEEN o_ordertime - INTERVAL '5' MINUTE AND o_ordertime
  1. 本例中,要求l_ordertime BETWEEN o_ordertime - INTERVAL ‘5’ MINUTE AND o_ordertime,所以在flink state中只保留近5分钟的父订单数据即可,减小了对flink内存的要求。

临时表join(Enrichment Join with Lookup Table in MySQL)

即Temporal Table Join,适用于仅插入(insert-only)动态表join静态表(无更新或更新频率较低)。

例:子订单表prod_lineitem(动态表)join 实时汇率表 prod_rates,用来计算人民币订单金额。

USE CATALOG default_catalog;

SELECT
  l_proctime AS `querytime`,
  l_orderkey AS `order`,
  l_linenumber AS `linenumber`,
  l_currency AS `currency`,
  rs_rate AS `cur_rate`, 
  (l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM prod_lineitem
JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency
WHERE
  l_linestatus = 'O'
  AND l_currency = 'CNY';
  

查询结果:

可能是史上覆盖flinksql功能最全的demo--part2

如上图,人民币汇率8.0166。

接下来,修改mysql维表中的人民币汇率为9.999:

# 修改人民币汇率
docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql

SELECT * FROM PROD_RATES;

UPDATE PROD_RATES SET RS_TIMESTAMP = '2020-04-01 01:00:00.000', RS_RATE = 9.999 WHERE RS_SYMBOL='CNY';

可能是史上覆盖flinksql功能最全的demo--part2

实时join的结果中,汇率也变为9.999:
可能是史上覆盖flinksql功能最全的demo--part2

注意:

  1. processing-time语义:根据processing-time去关联静态表(汇率表)mysql中的行
  2. mysql维表的更新会实时反馈到正在运行的job中

关键语法:

JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency

在join中指定动态表processing-time字段(l_proctime):FOR SYSTEM_TIME AS OF l_proctime

临时表函数join(Enrichment Join against Temporal Table)

Temporal Table Function Join指的是,通过join变更日志,进行某个事件时间点精确关联。

例:通过关联订单产生时刻的汇率,计算各币种的订单金额。

以Temporal Table Join中的案例需求为例,将mysql维表改为kafka维表(汇率变化时向kafka中写入最新汇率)。

使用TemporalTableFunction prod_rates_temporal 查询最新汇率:

USE CATALOG default_catalog;

SELECT
  l_ordertime AS `ordertime`,
  l_orderkey AS `order`,
  l_linenumber AS `linenumber`,
  l_currency AS `currency`,
  rs_rate AS `cur_rate`, 
  (l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM
  prod_lineitem,
  LATERAL TABLE(prod_rates_temporal(l_ordertime))
WHERE rs_symbol = l_currency AND
  l_linestatus = 'O';

结果:
可能是史上覆盖flinksql功能最全的demo--part2

注意:

  1. Event-time语义:以Event-time为依据,关联temporal table(kafka topic)中的行(汇率)
  2. 汇率变化通过向kafka topic中produce一条数据的方式变更

关键语法:
LATERAL TABLE(prod_rates_temporal(l_ordertime))

  1. LATERAL TABLE:Temporal Table Function 关联关键字
  2. prod_rates_temporal(l_ordertime):指向汇率变更日志的function,以Event-time作为参数
  3. 截止flink1.10版本,仅支持inner join