欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink之Table API&SQL(DDL/DML)

程序员文章站 2024-03-21 09:12:46
...

一、Table API&SQL的算子操作

Flink之Table API&SQL(DDL/DML)

 1.1、初始化查询

下面的示例显示如何在已注册和内联的表上指定SQL查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
 
// read a DataStream from an external source
val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
 
// SQL query with an inlined (unregistered) table
val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
val result = tableEnv.sqlQuery(
  s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
// SQL query with a registered table
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
// run a SQL query on the Table and retrieve the result as a new Table
val result2 = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 
// SQL update with a registered table
// create and register a TableSink
val csvSink: CsvTableSink = new CsvTableSink("/path/to/file", ...)
val fieldNames: Array[String] = Array("product", "amount")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 

1.2、支持的语法

Flink 使用 Apache Calcite 解析 SQL,支持标准的 ANSI SQL。但是不支持 DDL。

下面的 BNF-grammar 描述了批处理和流查询中支持的SQL特性。Operations 部分展示了支持的示例,并指出哪些特性只支持批处理或流查询。

insert:
  INSERT INTO tableReference
  query
  
query:
  values
  | {
      select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
 
orderItem:
  expression [ ASC | DESC ]
 
select:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
  FROM tableExpression
  [ WHERE booleanExpression ]
  [ GROUP BY { groupItem [, groupItem ]* } ]
  [ HAVING booleanExpression ]
  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
  
selectWithoutFrom:
  SELECT [ ALL | DISTINCT ]
  { * | projectItem [, projectItem ]* }
 
projectItem:
  expression [ [ AS ] columnAlias ]
  | tableAlias . *
 
tableExpression:
  tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
 
joinCondition:
  ON booleanExpression
  | USING '(' column [, column ]* ')'
 
tableReference:
  tablePrimary
  [ matchRecognize ]
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
 
tablePrimary:
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | UNNEST '(' expression ')'
 
values:
  VALUES expression [, expression ]*
 
groupItem:
  expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
windowRef:
    windowName
  | windowSpec
 
windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'
 
matchRecognize:
      MATCH_RECOGNIZE '('
      [ PARTITION BY expression [, expression ]* ]
      [ ORDER BY orderItem [, orderItem ]* ]
      [ MEASURES measureColumn [, measureColumn ]* ]
      [ ONE ROW PER MATCH ]
      [ AFTER MATCH
            ( SKIP TO NEXT ROW
            | SKIP PAST LAST ROW
            | SKIP TO FIRST variable
            | SKIP TO LAST variable
            | SKIP TO variable )
      ]
      PATTERN '(' pattern ')'
      [ WITHIN intervalLiteral ]
      DEFINE variable AS condition [, variable AS condition ]*
      ')'
 
measureColumn:
      expression AS alias
 
pattern:
      patternTerm [ '|' patternTerm ]*
 
patternTerm:
      patternFactor [ patternFactor ]*
 
patternFactor:
      variable [ patternQuantifier ]
 
patternQuantifier:
      '*'
  |   '*?'
  |   '+'
  |   '+?'
  |   '?'
  |   '??'
  |   '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  |   '{' repeat '}'

 Flink SQL对标识符(表、属性、函数名)使用词法策略,类似于Java:

  • 无论是否引用标识符,都会保留标识符的大小写。
  • 标识符将区分大小写地匹配。
  • 与Java不同,反引号允许标识符包含非字母数字字符(例如。"SELECT a AS `my field` FROM t",带空格)。

字符串文字必须用单引号括起来(例如,SELECT 'Hello World')。复制一个转义单引号(例如,SELECT 'It''s me.')。Unicode字符支持字符串文字。如果需要显式unicode代码点,请使用以下语法:

  • 使用反斜杠(\)作为转义字符(默认):SELECT U&'\263A'
  • 使用自定义转义字符:SELECT U&'#263A' UESCAPE '#'

二、DML(主要是查询)

2.1、show和use

名称 描述
Show
Batch Streaming

Show all catalogs

SHOW CATALOGS;

Show all databases in the current catalog

SHOW DATABASES;

Show all tables in the current database in the current catalog

SHOW TABLES;
Use
Batch Streaming

Set current catalog for the session

USE CATALOG mycatalog;

Set current database of the current catalog for the session

USE mydatabase;

2.2、Scan, Projection, and Filter

名称 描述
Scan / Select / As
Batch Streaming
 
  1. SELECT * FROM Orders

  2. SELECT a, c AS d FROM Orders

Where / Filter
Batch Streaming
 
  1. SELECT * FROM Orders WHERE b = 'red'

  2. SELECT * FROM Orders WHERE a % 2 = 0

User-defined Scalar Functions (Scalar UDF)
Batch Streaming

UDFs 必须注册到 TableEnvironment。具体使用方法请查阅://TODO

SELECT PRETTY_PRINT(user) FROM Orders

2.3、Aggregations

名称 描述
GroupBy Aggregation
Batch Streaming
Result Updating

注意:流表上的GroupBy会产生更新后的结果。详情请查阅://TODO

 
  1. SELECT a, SUM(b) as d

  2. FROM Orders

  3. GROUP BY a

GroupBy Window Aggregation
Batch Streaming

使用分组窗口计算每组的结果行,详情查看下方Group Windows。

  1. SELECT user, SUM(amount)

  2. FROM Orders

  3. GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user

Over Window aggregation
Streaming

注意:所有聚合必须在同一个窗口上定义,即、相同的分区、排序和范围。目前,只支持具有当前行范围之前(*和有界)的窗口。还不支持 FOLLOWING 内容的范围。ORDER BY必须在单个时间属性上指定

  1. SELECT COUNT(amount) OVER (

  2. PARTITION BY user

  3. ORDER BY proctime

  4. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

  5. FROM Orders

  6.  
  7. SELECT COUNT(amount) OVER w, SUM(amount) OVER w

  8. FROM Orders

  9. WINDOW w AS (

  10. PARTITION BY user

  11. ORDER BY proctime

  12. ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)

Distinct
Batch Streaming
Result Updating
SELECT DISTINCT users FROM Orders

注意:对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同字段的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。详情请查阅://TODO

Grouping sets, Rollup, Cube
Batch
 
  1. SELECT SUM(amount)

  2. FROM Orders

  3. GROUP BY GROUPING SETS ((user), (product))

Having
Batch Streaming
 
  1. SELECT SUM(amount)

  2. FROM Orders

  3. GROUP BY users

  4. HAVING SUM(amount) > 50

User-defined Aggregate Functions (UDAGG)
Batch Streaming

UDAGGs 必须注册到 TableEnvironment。具体操作请请查阅://TODO

 
  1. SELECT MyAggregate(amount)

  2. FROM Orders

  3. GROUP BY users

2.4、 Joins

名称 描述
Inner Equi-join
Batch Streaming

目前,只支持 equi-joins,即,具有至少一个连接条件并带有相等字段的连接。不支持任意交叉或连接。

注意:连接顺序没有优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,不支持交叉连接,并且会导致查询失败。

 
  1. SELECT *

  2. FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意: 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。 详情请查阅://TODO

Outer Equi-join
Batch Streaming Result Updating

目前,只支持 equi-joins,即,具有至少一个连接条件并带有相等条件的连接。不支持任意交叉或连接。

Note: 连接顺序没有优化。表按照FROM子句中指定的顺序连接。确保以不产生交叉连接(笛卡尔积)的顺序指定表,交叉连接不受支持,并且会导致查询失败。

 
  1. SELECT *

  2. FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

  3.  
  4. SELECT *

  5. FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

  6.  
  7. SELECT *

  8. FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

Note: 对于流查询,计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。 详情请查阅://TODO

Time-windowed Join
Batch Streaming

Note: 时间窗口连接是可以以流方式处理常规的连接子集。

有时间窗的连接至少需要一个等连接字段和一个连接条件,该条件在连接两边限定时间。这种条件可以由两个适当的范围字段(<、<=、>=、>)、一个BETWEEN字段或一个比较同一类型时间属性的等式字段(即,处理时间或事件时间)。

例如,以下字段是有效的窗口连接条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
 
  1. SELECT *

  2. FROM Orders o, Shipments s

  3. WHERE o.id = s.orderId AND

  4. o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

如果订单是在收到订单后4小时发出的,那么上面的示例将把所有订单与其相应的发货连接起来。
Expanding arrays into a relation
Batch Streaming

WITH ORDINALITY 不支持。

 
  1. SELECT users, tag

  2. FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Join with Table Function
Batch Streaming

将表与表函数的结果连接在一起。左边(外部)表的每一行都与表函数的相应调用生成的所有行连接在一起。

必须在此之前注册用户定义的表函数(UDTFs)。有关如何指定和注册UDTFs的详细信息,请参阅://TODO

Inner Join

A row of the left (outer) table is dropped, if its table function call returns an empty result.

 
  1. SELECT users, tag

  2. FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join

如果表函数调用返回空结果,则保留相应的外部行,并使用空值填充结果。

 
  1. SELECT users, tag

  2. FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

Note: 目前,只支持literal TRUE作为针对横向表的左外连接的谓词。

Join with Temporal Table
Streaming

时态表是跟踪随时间变化的表。

时态表函数提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与使用表函数联接的语法相同。

Note: 目前只支持与时态表的内部连接。

假设 Rates 是时态表函数,则连接可以用SQL表示如下:

 
  1. SELECT

  2. o_amount, r_rate

  3. FROM

  4. Orders,

  5. LATERAL TABLE (Rates(o_proctime))

  6. WHERE

  7. r_currency = o_currency

有关更多信息,请查阅://TODO

2.5、Set Operations 

名称 描述
Union
Batch
 
  1. SELECT *

  2. FROM (

  3. (SELECT user FROM Orders WHERE a % 2 = 0)

  4. UNION

  5. (SELECT user FROM Orders WHERE b = 0)

  6. )

UnionAll
Batch Streaming
 
  1. SELECT *

  2. FROM (

  3. (SELECT user FROM Orders WHERE a % 2 = 0)

  4. UNION ALL

  5. (SELECT user FROM Orders WHERE b = 0)

  6. )

Intersect / Except
Batch
 
  1. SELECT *

  2. FROM (

  3. (SELECT user FROM Orders WHERE a % 2 = 0)

  4. INTERSECT

  5. (SELECT user FROM Orders WHERE b = 0)

  6. )

 
  1. SELECT *

  2. FROM (

  3. (SELECT user FROM Orders WHERE a % 2 = 0)

  4. EXCEPT

  5. (SELECT user FROM Orders WHERE b = 0)

  6. )

In
Batch Streaming

如果给定表子查询中存在表达式,则返回true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。

 
  1. SELECT user, amount

  2. FROM Orders

  3. WHERE product IN (

  4. SELECT product FROM NewProducts

  5. )

Note: 对于流查询,算子在join和group操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。详情请查阅://TODO

Exists
Batch Streaming

如果子查询至少返回一行,则返回true。只有在可以在联接和组操作中重写算子时,才支持该操作。

 
  1. SELECT user, amount

  2. FROM Orders

  3. WHERE product EXISTS (

  4. SELECT product FROM NewProducts

  5. )

Note:对于流查询,算子在 join 和 group 算子中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见//TODO。

2.6、OrderBy & Limit

名称 说明
Order By
Batch Streaming
Note: 流查询的结果必须主要根据升序时间属性进行排序。还支持其他排序属性。有关时间属性请查阅://TODO
 
  1. SELECT *

  2. FROM Orders

  3. ORDER BY orderTime

Limit
Batch
Note: LIMIT 子句要求有 ORDER BY 子句。
 
  1. SELECT *

  2. FROM Orders

  3. ORDER BY orderTime

  4. LIMIT 3

2.7 Insert

名称 说明
Insert Into
Batch Streaming

输出表必须在TableEnvironment中注册(详情请查阅://TODO)。此外,已注册表的模式必须与查询的模式匹配。

 
  1. INSERT INTO OutputTable

  2. SELECT users, tag

  3. FROM Orders

2.8、Group Windows

group windows 是在 SQL 查询的 Group BY 子句中定义。与使用常规 GROUP BY 子句的查询一样,使用包含 GROUP 窗口函数的 GROUP BY 子句的查询为每个组计算一个结果行。批处理表和流表上的SQL支持以下 group windows 函数。

group 窗口函数 说明
TUMBLE(time_attr, interval)

定义一个滚动时间窗口。翻滚时间窗口将行分配给具有固定持续时间(间隔)的非重叠连续窗口。

例如,一个5分钟滚动窗口以5分钟为间隔将行分组。翻滚窗口可以在事件时间(流+批处理)或处理时间(流)上定义。

HOP(time_attr, interval, interval)

定义一个跳转时间窗口(在 Table API 中称为滑动窗口)。一个跳跃时间窗口有一个固定的持续时间(第二个间隔参数)和一个指定的跳跃间隔(第一个间隔参数)。如果跳转间隔小于窗口大小,则跳转窗口重叠。因此,可以将行分配给多个窗口。

例如,一个15分钟大小的跳转窗口和5分钟的跳转间隔将每一行分配给3个15分钟大小的不同窗口,这些窗口在5分钟的间隔内计算。跳跃窗口可以在事件时间(流+批处理)或处理时间(流)上定义。

SESSION(time_attr, interval) 定义会话时间窗口。会话时间窗口没有固定的持续时间,但它们的边界由不活动的时间间隔定义,即,如果在定义的间隔期间没有出现事件,则关闭会话窗口。

时间属性

对于流表上的SQL查询,group window 函数的 time_attr 参数必须引用一个有效的时间属性,该属性指定行 processing time 或 event time。参见//TODO,了解如何定义时间属性。

对于批处理表上的SQL,组窗口函数的 time_attr 参数必须是 TIMESTAMP 类型的属性。

选择组窗口开始和结束时间戳

组窗口的开始和结束时间戳以及时间属性可以通过以下辅助功能来选择:

方法 描述
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
返回相应滚动、跳转或会话窗口的包含下界的时间戳。
TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

返回相应滚动、跳转或会话窗口的独占上界的时间戳。

注意:在后续基于时间的操作中,例如时间窗口连接和组窗口或窗口聚合,不能将独占的上界时间戳用作rowtime属性。

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

返回相应滚动、跳转或会话窗口的包含上限的时间戳。

结果属性是一个 rowtime 属性,可用于后续基于时间的操作,如时间窗口连接和组窗口或窗口聚合。

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)
返回 proctime 属性,该属性可用于后续基于时间的操作,如时间窗口连接和组窗口或窗口聚合。

注意: 必须使用与 GROUP BY 子句中的 group window 函数完全相同的参数调用辅助函数。

 下面的示例显示如何在流表上使用组窗口指定SQL查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
 
// read a DataStream from an external source
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// register the DataStream under the name "Orders"
tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
 
// compute SUM(amount) per day (in event-time)
val result1 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
    """.stripMargin)
// compute SUM(amount) per day (in processing-time)
val result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
// compute every hour the SUM(amount) of the last 24 hours in event-time
val result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
val result4 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
      |  SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
    """.stripMargin)
 

2.9、Pattern Recognition

名称 描述
MATCH_RECOGNIZE
Streaming

根据 match_recognition ISO 标准在流表中搜索给定的模式。这使得在SQL查询中表达复杂事件处理(CEP)逻辑成为可能。

有关更详细的描述,请参见//TODO。

 
  1. SELECT T.aid, T.bid, T.cid

  2. FROM MyTable

  3. MATCH_RECOGNIZE (

  4. PARTITION BY userid

  5. ORDER BY proctime

  6. MEASURES

  7. A.id AS aid,

  8. B.id AS bid,

  9. C.id AS cid

  10. PATTERN (A B C)

  11. DEFINE

  12. A AS name = 'a',

  13. B AS name = 'b',

  14. C AS name = 'c'

  15. ) AS T

 

三、DDL

TableEnvironment 的 sqlUpdate()方法指定 DDL。建表成功后该方法不返回任何内容。可以使用 CREATE TABLE 语句将表注册到 Catalog 中,可以在 sqlQuery() 方法中被引用。

注意:Flink 的 DDL 支持尚未完成。包含不支持的 SQL 特性的查询会导致表异常。下面几节列出了批处理表和流表上支持的 SQL DDL 特性。

3.1、指定DDL

下面的示例显示如何指定 SQL DDL。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
 
// SQL query with a registered table
// register a table named "Orders"
tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
// run a SQL query on the Table and retrieve the result as a new Table
val result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
// SQL update with a registered table
// register a TableSink
tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)");
// run a SQL update query on the Table and emit the result to the TableSink
tableEnv.sqlUpdate(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 

3.2、建表

CREATE TABLE [catalog_name.][db_name.]table_name
  [(col_name1 col_type1 [COMMENT col_comment1], ...)]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  WITH (key1=val1, key2=val2, ...)

创建具有给定表属性的表。如果数据库中已经存在同名表,则抛出异常。

PARTITIONED BY

按指定列对创建的表进行分区。如果将该表用作文件系统接收器,则为每个分区创建一个目录。

WITH OPTIONS

用于创建表源/接收器的表属性。这些属性通常用于查找和创建底层连接器。

表达式key1=val1的键和值都应该是字符串。

注意:表名称有三种格式:

1. catalog_name.db_name.table_name:表注册到 metastore,目录名为catalog_name,库名为db_name;

2. db_name.table_name:将表注册到执行表环境和数据库的当前目录为db_name;

3. table_name:表将被注册到执行表环境的当前目录和数据库中。

注意:使用 CREATE TABLE 语句注册的表可以同时用作表源和表接收器,在 DMLs 中引用它之前,我们无法确定它是用作源还是接收器。 

3.3、删表 

DROP TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

删除具有给定表名的表。如果要删除的表不存在,则抛出异常。

IF EXISTS

如果表不存在,则什么也不会发生。

3.4、数据类型

详细的数据类型介绍请查阅 //TODO,一下做一个简单的说明。

泛型类型和(嵌套的)复合类型(例如 POJOs, tuples, rows, Scala case classes)也可以是一行的字段。

可以使用值访问函数访问具有任意嵌套的复合类型的字段。详细的方法请查阅 //TODO

泛型类型被视为一个黑盒,可以由用户定义的函数传递或处理。详情请查阅 //TODO

对于 DDLs,我们支持在页面数据类型中定义的完整数据类型。详细的数据类型请查阅 //TODO

注意:SQL查询中不支持一些数据类型,例如:STRINGBYTESTIME(p) WITHOUT TIME ZONETIME(p) WITH LOCAL TIME ZONETIMESTAMP(p) WITHOUT TIME ZONETIMESTAMP(p) WITH LOCAL TIME ZONEARRAYMULTISETROW。

3.5、保留关键字

虽然还没有实现所有SQL特性,但是一些字符串组合已经被保留为关键字,以供将来使用。如果想使用下列字符串之一作为字段名,请确保用反引号(例如`value``count`)。 

参考:https://blog.csdn.net/zpf_940810653842/category_9074862.html 

相关标签: Flink基础 DDL/DML