欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink中的临时表 Temporal Tables

程序员文章站 2022-07-14 13:38:56
...

临时表 Temporal Tables

翻译自flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/temporal_tables.html#top

临时表表示一个可变表上(参数化)视图的概念,该视图返回表在特定时间点的内容。

可变表可以是跟踪变更的变更历史表(如数据库变更日志),也可以是具体化变更的可变维度表(如数据库表)。

对于变更历史表,Flink可以跟踪更改,并允许在查询中以某个时间点为依据访问表的内容。在Flink中,这种表由一个临时表函数(Temporal Table Function)表示。

对于可变维度表,Flink允许在查询中以处理时间(processing time)为依据访问表的内容。在Flink中,这种表由一个临时表(Temporal Table)表示。

使用临时表的动机 Motivation

关联变更记录表 Correlate with a changing history table

假设有如下,汇率变更历史记录表:

SELECT * FROM RatesHistory;

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

RatesHistory表示不断增长的仅追加的日元汇率的表。
例如,从09:00到10:45的欧元对日元的汇率为114。从10:45到11:15的汇率为116。

假设我们要在10:58的时间输出所有当前汇率,则需要以下SQL查询来计算结果表:

SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
  SELECT MAX(rowtime)
  FROM RatesHistory AS r2
  WHERE r2.currency = r.currency
  AND r2.rowtime <= TIME '10:58');

相关子查询确定对应货币的最大时间小于或等于所需时间,外部查询列出具有最大时间戳的汇率。

下表显示了这种计算的结果, 在示例中,考虑了10:45时欧元的更新,但是10:58时表的版本中未考虑11:15时欧元的更新和新的英镑输入。

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116

临时表的概念旨在简化此类查询,加快它们的执行速度,并减少Flink的状态使用。临时表是仅追加(append-only)表上的参数化视图,它将append-only表的行解释为表的变更日志,并在特定时间点提供该表的特定版本。将append- only表解释为变更日志需要指定主键属性和时间戳属性。主键确定覆盖哪些行,时间戳确定行有效的时间。

在上面的示例中,currency将是RatesHistory表的主键,rowtime将是timestamp属性。
在Flink中,这由一个临时表函数(Temporal Table Function)表示。

关联可变维度表 Correlate with a changing dimension table

另一方面,一些用例需要连接一个不断变化的维度表,这个维度表是一个外部数据库表。

假设LatestRates是一个以最新汇率具体化的表格。 LatestRates是历史汇率的具体化。 那么在10:58时的LatestRates表的内容将是:

10:58> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        116

在12:00时的LatestRates表的内容将是:

12:00> SELECT * FROM LatestRates;
currency   rate
======== ======
US Dollar   102
Yen           1
Euro        119
Pounds      108

在flink中,由临时表(Temporal Table)表示。

临时表函数 Temporal Table Function

为了访问临时表中的数据,必须传递一个时间属性,该属性确定将要返回的表的版本。 Flink使用表函数的SQL语法提供一种表达它的方法。

定义后,临时表函数将使用单个时间参数timeAttribute并返回一些行。 这些行包含相对于给定时间属性的所有现有主键的行的最新版本。

假设我们基于RatesHistory表定义了一个临时表函数Rates(timeAttribute),则可以通过以下方式查询该函数:

SELECT * FROM Rates('10:15');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1

SELECT * FROM Rates('11:00');

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1

对函数Rates(timeAttribute)的每个查询都将返回给定timeAttribute的Rates状态。

注意:目前,Flink不支持使用常量时间属性参数直接查询临时表函数。同时,临时表函数只能在join中使用。 上面的示例仅用于提供有关Rates(timeAttribute)函数返回值的直观信息。

另请参阅有关用于连续查询的联接( joins for continuous queries)的页面,以获取有关如何与临时表联接的更多信息。

定义临时表函数 Defining Temporal Table Function

以下代码段说明了如何从仅追加(append-only)表中创建临时表函数。

import org.apache.flink.table.functions.TemporalTableFunction;
(...)

// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Provide a static data set of the rates history table.
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
ratesHistoryData.add(Tuple2.of("Euro", 114L));
ratesHistoryData.add(Tuple2.of("Yen", 1L));
ratesHistoryData.add(Tuple2.of("Euro", 116L));
ratesHistoryData.add(Tuple2.of("Euro", 119L));

// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");

tEnv.createTemporaryView("RatesHistory", ratesHistory);

// Create and register a temporal table function.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);      // <==== (2)


第(1)行创建了一个汇率临时表函数,该函数使我们能够在Table API中使用函数。

第(2)行在表环境中以Rates名称注册了该函数,这使我们可以在SQL中使用Rates函数。

临时表 Temporal Table

注意:Temporal Table 仅适用于Blink Planner。

为了访问临时表中的数据,当前必须使用LookupableTableSource定义一个TableSource。 Flink使用FOR SYSTEM_TIME AS OF的SQL语法查询临时表,该语法于SQL:2011中提出。

假设我们定义了一个名为LatestRates的临时表,我们可以通过以下方式查询此类表:

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';

currency   rate
======== ======
US Dollar   102
Euro        116
Yen           1

注意:目前,Flink不支持以固定时间直接查询临时表。同时,临时表只能在联接中使用。 上面的示例仅用于提供有关临时表LatestRates返回的直观信息。

另请参阅有关用于连续查询的联接的页面,以获取有关如何与临时表联接的更多信息。

定义临时表 Defining Temporal Table

// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

// Create an HBaseTableSource as a temporal table which implements LookableTableSource
// In the real setup, you should replace this with your own table.
HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
rates.setRowKey("currency", String.class);   // currency as the primary key
rates.addColumn("fam1", "rate", Double.class);

// register the temporal table into environment, then we can query it in sql
tEnv.registerTableSource("Rates", rates);

另请参阅有关如何定义LookupableTableSource的页面。