Flink之Table API&SQL(DDL/DML)
一、Table API&SQL的算子操作
1.1、初始化查询
下面的示例显示如何在已注册和内联的表上指定SQL查询。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sqlQuery(
s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result2 = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// SQL update with a registered table
// create and register a TableSink
val csvSink: CsvTableSink = new CsvTableSink("/path/to/file", ...)
val fieldNames: Array[String] = Array("product", "amount")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
1.2、支持的语法
Flink 使用 Apache Calcite 解析 SQL,支持标准的 ANSI SQL。但是不支持 DDL。
下面的 BNF-grammar 描述了批处理和流查询中支持的SQL特性。Operations 部分展示了支持的示例,并指出哪些特性只支持批处理或流查询。
insert:
INSERT INTO tableReference
query
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
Flink SQL对标识符(表、属性、函数名)使用词法策略,类似于Java:
- 无论是否引用标识符,都会保留标识符的大小写。
- 标识符将区分大小写地匹配。
- 与Java不同,反引号允许标识符包含非字母数字字符(例如。
"SELECT a AS `my field` FROM t",带空格
)。
字符串文字必须用单引号括起来(例如,SELECT 'Hello World'
)。复制一个转义单引号(例如,SELECT 'It''s me.'
)。Unicode字符支持字符串文字。如果需要显式unicode代码点,请使用以下语法:
- 使用反斜杠(\)作为转义字符(默认):
SELECT U&'\263A'
- 使用自定义转义字符:
SELECT U&'#263A' UESCAPE '#'
二、DML(主要是查询)
2.1、show和use
名称 | 描述 |
Show Batch Streaming |
Show all catalogs
Show all databases in the current catalog
Show all tables in the current database in the current catalog
|
Use Batch Streaming |
Set current catalog for the session
Set current database of the current catalog for the session
|
2.2、Scan, Projection, and Filter
名称 | 描述 |
Scan / Select / As Batch Streaming |
|
Where / Filter Batch Streaming |
|
User-defined Scalar Functions (Scalar UDF) Batch Streaming |
UDFs 必须注册到 TableEnvironment。具体使用方法请查阅://TODO
|
2.3、Aggregations
名称 | 描述 |
GroupBy Aggregation Batch Streaming Result Updating |
注意:流表上的GroupBy会产生更新后的结果。详情请查阅://TODO
|
GroupBy Window Aggregation Batch Streaming |
使用分组窗口计算每组的结果行,详情查看下方Group Windows。
|
Over Window aggregation Streaming |
注意:所有聚合必须在同一个窗口上定义,即、相同的分区、排序和范围。目前,只支持具有当前行范围之前(*和有界)的窗口。还不支持 FOLLOWING 内容的范围。ORDER BY必须在单个时间属性上指定
|
Distinct Batch Streaming Result Updating |
注意:对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同字段的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。详情请查阅://TODO |
Grouping sets, Rollup, Cube Batch |
|
Having Batch Streaming |
|
User-defined Aggregate Functions (UDAGG) Batch Streaming |
UDAGGs 必须注册到 TableEnvironment。具体操作请请查阅://TODO
|
2.4、 Joins
名称 | 描述 |
Inner Equi-join Batch Streaming |
目前,只支持 equi-joins,即,具有至少一个连接条件并带有相等字段的连接。不支持任意交叉或连接。 注意:连接顺序没有优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,不支持交叉连接,并且会导致查询失败。
注意: 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。 详情请查阅://TODO |
Outer Equi-join Batch Streaming Result Updating |
目前,只支持 equi-joins,即,具有至少一个连接条件并带有相等条件的连接。不支持任意交叉或连接。 Note: 连接顺序没有优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,交叉连接不受支持,并且会导致查询失败。
Note: 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。 详情请查阅://TODO |
Time-windowed Join Batch Streaming |
Note: 时间窗口连接是可以以流方式处理常规的连接子集。 有时间窗的连接至少需要一个等连接字段和一个连接条件,该条件在连接两边限定时间。这种条件可以由两个适当的范围字段(<、<=、>=、>)、一个BETWEEN字段或一个比较同一类型时间属性的等式字段(即,处理时间或事件时间)。 例如,以下字段是有效的窗口连接条件:
|
Expanding arrays into a relation Batch Streaming |
WITH ORDINALITY 不支持。
|
Join with Table Function Batch Streaming |
将表与表函数的结果连接在一起。左边(外部)表的每一行都与表函数的相应调用生成的所有行连接在一起。 必须在此之前注册用户定义的表函数(UDTFs)。有关如何指定和注册UDTFs的详细信息,请参阅://TODO Inner Join A row of the left (outer) table is dropped, if its table function call returns an empty result.
Left Outer Join 如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。
Note: 目前,只支持literal TRUE作为针对横向表的左外连接的谓词。 |
Join with Temporal Table Streaming |
时态表是跟踪随时间变化的表。 时态表函数提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与使用表函数联接的语法相同。 Note: 目前只支持与时态表的内部连接。 假设 Rates 是时态表函数,则连接可以用SQL表示如下:
有关更多信息,请查阅://TODO |
2.5、Set Operations
名称 | 描述 |
Union Batch |
|
UnionAll Batch Streaming |
|
Intersect / Except Batch |
|
In Batch Streaming |
如果给定表子查询中存在表达式,则返回true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。
Note: 对于流查询,算子在join和group操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。详情请查阅://TODO |
Exists Batch Streaming |
如果子查询至少返回一行,则返回true。只有在可以在联接和组操作中重写算子时,才支持该操作。
Note:对于流查询,算子在 join 和 group 算子中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见//TODO。 |
2.6、OrderBy & Limit
名称 | 说明 |
Order By Batch Streaming |
Note: 流查询的结果必须主要根据升序时间属性进行排序。还支持其他排序属性。有关时间属性请查阅://TODO
|
Limit Batch |
Note: LIMIT 子句要求有 ORDER BY 子句。
|
2.7 Insert
名称 | 说明 |
Insert Into Batch Streaming |
输出表必须在TableEnvironment中注册(详情请查阅://TODO)。此外,已注册表的模式必须与查询的模式匹配。
|
2.8、Group Windows
group windows 是在 SQL 查询的 Group BY 子句中定义。与使用常规 GROUP BY 子句的查询一样,使用包含 GROUP 窗口函数的 GROUP BY 子句的查询为每个组计算一个结果行。批处理表和流表上的SQL支持以下 group windows 函数。
group 窗口函数 | 说明 |
TUMBLE(time_attr, interval) |
定义一个滚动时间窗口。翻滚时间窗口将行分配给具有固定持续时间(间隔)的非重叠连续窗口。 例如,一个5分钟滚动窗口以5分钟为间隔将行分组。翻滚窗口可以在事件时间(流+批处理)或处理时间(流)上定义。 |
HOP(time_attr, interval, interval) |
定义一个跳转时间窗口(在 Table API 中称为滑动窗口)。一个跳跃时间窗口有一个固定的持续时间(第二个间隔参数)和一个指定的跳跃间隔(第一个间隔参数)。如果跳转间隔小于窗口大小,则跳转窗口重叠。因此,可以将行分配给多个窗口。 例如,一个15分钟大小的跳转窗口和5分钟的跳转间隔将每一行分配给3个15分钟大小的不同窗口,这些窗口在5分钟的间隔内计算。跳跃窗口可以在事件时间(流+批处理)或处理时间(流)上定义。 |
SESSION(time_attr, interval) |
定义会话时间窗口。会话时间窗口没有固定的持续时间,但它们的边界由不活动的时间间隔定义,即,如果在定义的间隔期间没有出现事件,则关闭会话窗口。 |
时间属性
对于流表上的SQL查询,group window 函数的 time_attr 参数必须引用一个有效的时间属性,该属性指定行 processing time 或 event time。参见//TODO,了解如何定义时间属性。
对于批处理表上的SQL,组窗口函数的 time_attr 参数必须是 TIMESTAMP 类型的属性。
选择组窗口开始和结束时间戳
组窗口的开始和结束时间戳以及时间属性可以通过以下辅助功能来选择:
方法 | 描述 |
TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval)
|
返回相应滚动、跳转或会话窗口的包含下界的时间戳。 |
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval)
|
返回相应滚动、跳转或会话窗口的独占上界的时间戳。 注意:在后续基于时间的操作中,例如时间窗口连接和组窗口或窗口聚合,不能将独占的上界时间戳用作rowtime属性。 |
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval)
|
返回相应滚动、跳转或会话窗口的包含上限的时间戳。 结果属性是一个 rowtime 属性,可用于后续基于时间的操作,如时间窗口连接和组窗口或窗口聚合。 |
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval)
|
返回 proctime 属性,该属性可用于后续基于时间的操作,如时间窗口连接和组窗口或窗口聚合。 |
注意: 必须使用与
GROUP
BY 子句中的 group window 函数完全相同的参数调用辅助函数。
下面的示例显示如何在流表上使用组窗口指定SQL查询。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// read a DataStream from an external source
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sqlQuery(
"""
|SELECT
| user,
| TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
| SUM(amount)
| FROM Orders
| GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
""".stripMargin)
// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sqlQuery(
"SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
// compute every hour the SUM(amount) of the last 24 hours in event-time
val result3 = tableEnv.sqlQuery(
"SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sqlQuery(
"""
|SELECT
| user,
| SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
| SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
| SUM(amount)
| FROM Orders
| GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
""".stripMargin)
2.9、Pattern Recognition
名称 | 描述 |
MATCH_RECOGNIZE Streaming |
根据 match_recognition ISO 标准在流表中搜索给定的模式。这使得在SQL查询中表达复杂事件处理(CEP)逻辑成为可能。 有关更详细的描述,请参见//TODO。
|
三、DDL
TableEnvironment 的 sqlUpdate()方法指定 DDL。建表成功后该方法不返回任何内容。可以使用 CREATE TABLE 语句将表注册到 Catalog 中,可以在 sqlQuery() 方法中被引用。
注意:
Flink 的 DDL 支持尚未完成。包含不支持的 SQL 特性的查询会导致表异常。下面几节列出了批处理表和流表上支持的 SQL DDL 特性。
3.1、指定DDL
下面的示例显示如何指定 SQL DDL。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
// SQL query with a registered table
// register a table named "Orders"
tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// SQL update with a registered table
// register a TableSink
tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)");
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
3.2、建表
CREATE TABLE [catalog_name.][db_name.]table_name
[(col_name1 col_type1 [COMMENT col_comment1], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name1, col_name2, ...)]
WITH (key1=val1, key2=val2, ...)
创建具有给定表属性的表。如果数据库中已经存在同名表,则抛出异常。
按指定列对创建的表进行分区。如果将该表用作文件系统接收器,则为每个分区创建一个目录。
用于创建表源/接收器的表属性。这些属性通常用于查找和创建底层连接器。
表达式key1=val1的键和值都应该是字符串。
注意:表名称有三种格式:
1.
catalog_name.db_name.table_name:
表注册到 metastore,目录名为
catalog_name,库名为db_name;2.
db_name.table_name:
将表注册到执行表环境和数据库的当前目录为db_name;3.
table_name:
表将被注册到执行表环境的当前目录和数据库中。
注意:使用
CREATE TABLE
语句注册的表可以同时用作表源和表接收器,在 DMLs 中引用它之前,我们无法确定它是用作源还是接收器。
3.3、删表
DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
删除具有给定表名的表。如果要删除的表不存在,则抛出异常。
如果表不存在,则什么也不会发生。
3.4、数据类型
详细的数据类型介绍请查阅 //TODO,一下做一个简单的说明。
泛型类型和(嵌套的)复合类型(例如 POJOs, tuples, rows, Scala case classes)也可以是一行的字段。
可以使用值访问函数访问具有任意嵌套的复合类型的字段。详细的方法请查阅 //TODO
泛型类型被视为一个黑盒,可以由用户定义的函数传递或处理。详情请查阅 //TODO
对于 DDLs,我们支持在页面数据类型中定义的完整数据类型。详细的数据类型请查阅 //TODO
注意:SQL查询中不支持一些数据类型,例如:
STRING
,BYTES
,TIME(p) WITHOUT TIME ZONE
,TIME(p) WITH LOCAL TIME ZONE
,TIMESTAMP(p) WITHOUT TIME ZONE
,TIMESTAMP(p) WITH LOCAL TIME ZONE
,ARRAY
,MULTISET
,ROW。
3.5、保留关键字
虽然还没有实现所有SQL特性,但是一些字符串组合已经被保留为关键字,以供将来使用。如果想使用下列字符串之一作为字段名,请确保用反引号(例如`value`
, `count`
)。
参考:https://blog.csdn.net/zpf_940810653842/category_9074862.html