Flink中的临时表 Temporal Tables
临时表 Temporal Tables
翻译自flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/temporal_tables.html#top
临时表表示一个可变表上(参数化)视图的概念,该视图返回表在特定时间点的内容。
可变表可以是跟踪变更的变更历史表(如数据库变更日志),也可以是具体化变更的可变维度表(如数据库表)。
对于变更历史表,Flink可以跟踪更改,并允许在查询中以某个时间点为依据访问表的内容。在Flink中,这种表由一个临时表函数(Temporal Table Function)表示。
对于可变维度表,Flink允许在查询中以处理时间(processing time)为依据访问表的内容。在Flink中,这种表由一个临时表(Temporal Table)表示。
使用临时表的动机 Motivation
关联变更记录表 Correlate with a changing history table
假设有如下,汇率变更历史记录表:
SELECT * FROM RatesHistory;
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
RatesHistory表示不断增长的仅追加的日元汇率的表。
例如,从09:00到10:45的欧元对日元的汇率为114。从10:45到11:15的汇率为116。
假设我们要在10:58的时间输出所有当前汇率,则需要以下SQL查询来计算结果表:
SELECT *
FROM RatesHistory AS r
WHERE r.rowtime = (
SELECT MAX(rowtime)
FROM RatesHistory AS r2
WHERE r2.currency = r.currency
AND r2.rowtime <= TIME '10:58');
相关子查询确定对应货币的最大时间小于或等于所需时间,外部查询列出具有最大时间戳的汇率。
下表显示了这种计算的结果, 在示例中,考虑了10:45时欧元的更新,但是10:58时表的版本中未考虑11:15时欧元的更新和新的英镑输入。
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
10:45 Euro 116
临时表的概念旨在简化此类查询,加快它们的执行速度,并减少Flink的状态使用。临时表是仅追加(append-only)表上的参数化视图,它将append-only表的行解释为表的变更日志,并在特定时间点提供该表的特定版本。将append- only表解释为变更日志需要指定主键属性和时间戳属性。主键确定覆盖哪些行,时间戳确定行有效的时间。
在上面的示例中,currency将是RatesHistory表的主键,rowtime将是timestamp属性。
在Flink中,这由一个临时表函数(Temporal Table Function)表示。
关联可变维度表 Correlate with a changing dimension table
另一方面,一些用例需要连接一个不断变化的维度表,这个维度表是一个外部数据库表。
假设LatestRates是一个以最新汇率具体化的表格。 LatestRates是历史汇率的具体化。 那么在10:58时的LatestRates表的内容将是:
10:58> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Yen 1
Euro 116
在12:00时的LatestRates表的内容将是:
12:00> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Yen 1
Euro 119
Pounds 108
在flink中,由临时表(Temporal Table)表示。
临时表函数 Temporal Table Function
为了访问临时表中的数据,必须传递一个时间属性,该属性确定将要返回的表的版本。 Flink使用表函数的SQL语法提供一种表达它的方法。
定义后,临时表函数将使用单个时间参数timeAttribute并返回一些行。 这些行包含相对于给定时间属性的所有现有主键的行的最新版本。
假设我们基于RatesHistory表定义了一个临时表函数Rates(timeAttribute),则可以通过以下方式查询该函数:
SELECT * FROM Rates('10:15');
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
SELECT * FROM Rates('11:00');
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
10:45 Euro 116
09:00 Yen 1
对函数Rates(timeAttribute)的每个查询都将返回给定timeAttribute的Rates状态。
注意:目前,Flink不支持使用常量时间属性参数直接查询临时表函数。同时,临时表函数只能在join中使用。 上面的示例仅用于提供有关Rates(timeAttribute)函数返回值的直观信息。
另请参阅有关用于连续查询的联接( joins for continuous queries)的页面,以获取有关如何与临时表联接的更多信息。
定义临时表函数 Defining Temporal Table Function
以下代码段说明了如何从仅追加(append-only)表中创建临时表函数。
import org.apache.flink.table.functions.TemporalTableFunction;
(...)
// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Provide a static data set of the rates history table.
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
ratesHistoryData.add(Tuple2.of("Euro", 114L));
ratesHistoryData.add(Tuple2.of("Yen", 1L));
ratesHistoryData.add(Tuple2.of("Euro", 116L));
ratesHistoryData.add(Tuple2.of("Euro", 119L));
// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
tEnv.createTemporaryView("RatesHistory", ratesHistory);
// Create and register a temporal table function.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates); // <==== (2)
第(1)行创建了一个汇率临时表函数,该函数使我们能够在Table API中使用函数。
第(2)行在表环境中以Rates名称注册了该函数,这使我们可以在SQL中使用Rates函数。
临时表 Temporal Table
注意:Temporal Table 仅适用于Blink Planner。
为了访问临时表中的数据,当前必须使用LookupableTableSource定义一个TableSource。 Flink使用FOR SYSTEM_TIME AS OF的SQL语法查询临时表,该语法于SQL:2011中提出。
假设我们定义了一个名为LatestRates的临时表,我们可以通过以下方式查询此类表:
SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
currency rate
======== ======
US Dollar 102
Euro 116
Yen 1
注意:目前,Flink不支持以固定时间直接查询临时表。同时,临时表只能在联接中使用。 上面的示例仅用于提供有关临时表LatestRates返回的直观信息。
另请参阅有关用于连续查询的联接的页面,以获取有关如何与临时表联接的更多信息。
定义临时表 Defining Temporal Table
// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
// Create an HBaseTableSource as a temporal table which implements LookableTableSource
// In the real setup, you should replace this with your own table.
HBaseTableSource rates = new HBaseTableSource(conf, "Rates");
rates.setRowKey("currency", String.class); // currency as the primary key
rates.addColumn("fam1", "rate", Double.class);
// register the temporal table into environment, then we can query it in sql
tEnv.registerTableSource("Rates", rates);
另请参阅有关如何定义LookupableTableSource的页面。
上一篇: Flink DataStream API 编程指南
下一篇: JDK1.7和JDK1.8新特性