欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink实战(六) - Table API & SQL编程

程序员文章站 2022-05-29 10:09:31
1 意义 1.1 分层的 APIs & 抽象层次 Flink提供三层API。 每个API在简洁性和表达性之间提供不同的权衡,并针对不同的用例。 而且Flink提供不同级别的抽象来开发流/批处理应用程序 最低级抽象只提供有状态流。它通过Process Function嵌入到DataStream API ......

1 意义

1.1 分层的 apis & 抽象层次

flink提供三层api。 每个api在简洁性和表达性之间提供不同的权衡,并针对不同的用例。

Flink实战(六) - Table API & SQL编程

而且flink提供不同级别的抽象来开发流/批处理应用程序

Flink实战(六) - Table API & SQL编程

  • 最低级抽象只提供有状态流。它通过process function嵌入到datastream api中。它允许用户*处理来自一个或多个流的事件,并使用一致的容错状态。此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。
  • 实际上,大多数应用程序不需要上述低级抽象,而是针对core api编程, 如datastream api(有界/*流)和dataset api (有界数据集)。这些流畅的api提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连接,聚合,窗口,状态等。在这些api中处理的数据类型在相应的编程语言中表示为类。
    低级process function与datastream api集成,因此只能对某些 算子操作进行低级抽象。该数据集api提供的有限数据集的其他原语,如循环/迭代。
  • table api 是为中心的声明性dsl 表,其可被动态地改变的表(表示流时)。该 table api遵循(扩展)关系模型:表有一个模式连接(类似于在关系数据库中的表)和api提供可比的 算子操作,如选择,项目,连接,分组依据,聚合等 table api程序以声明方式定义应该执行的逻辑 算子操作,而不是准确指定 算子操作代码的外观。虽然 table api可以通过各种类型的用户定义函数进行扩展,但它的表现力不如core api,但使用更简洁(编写的代码更少)。此外, table api程序还会通过优化程序,在执行之前应用优化规则。
    可以在表和datastream / dataset之间无缝转换,允许程序混合 table api以及datastream 和dataset api。
  • flink提供的*抽象是sql。这种抽象在语义和表达方面类似于 table api,但是将程序表示为sql查询表达式。在sql抽象与 table api紧密地相互作用,和sql查询可以通过定义表来执行 table api。1.2 模型类比mapreduce ==> hive sql
    spark ==> spark sql
    flink ==> sql

2 总览

2.1 简介

apache flink具有两个关系型api

  • table api
  • sql

用于统一流和批处理

table api是scala和java语言集成查询api,可以非常直观的方式组合来自关系算子的查询(e.g. 选择,过滤和连接).

flink的sql支持基于实现sql标准的apache calcite。无论输入是批输入(dataset)还是流输入(datastream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

table api和sql接口彼此紧密集成,就如flink的datastream和dataset api。我们可以轻松地在基于api构建的所有api和库之间切换。例如,可以使用cep库从datastream中提取模式,然后使用 table api分析模式,或者可以在预处理上运行gelly图算法之前使用sql查询扫描,过滤和聚合批处理表数据。

table api和sql尚未完成并且正在积极开发中。并非 table api,sql和stream,batch输入的每种组合都支持所有算子操作

2.2 依赖结构

所有table api和sql组件都捆绑在flink-table maven工件中。

以下依赖项与大多数项目相关:

  • flink-table-common
    通过自定义函数,格式等扩展表生态系统的通用模块。
  • flink-table-api-java
    使用java编程语言的纯表程序的表和sql api(在早期开发阶段,不推荐!)。
  • flink-table-api-scala
    使用scala编程语言的纯表程序的表和sql api(在早期开发阶段,不推荐!)。
  • flink-table-api-java-bridge
    使用java编程语言支持datastream / dataset api的table&sql api。
  • flink-table-api-scala-bridge
    使用scala编程语言支持datastream / dataset api的table&sql api。
  • flink-table-planner
    表程序规划器和运行时。
  • flink-table-uber
    将上述模块打包成大多数table&sql api用例的发行版。 uber jar文件flink-table * .jar位于flink版本的/ opt目录中,如果需要可以移动到/ lib。

2.3 项目依赖

必须将以下依赖项添加到项目中才能使用table api和sql来定义管道:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-table-planner_2.11</artifactid>
  <version>1.8.0</version>
</dependency>

此外,根据目标编程语言,您需要添加java或scala api。

<!-- either... -->
<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-table-api-java-bridge_2.11</artifactid>
  <version>1.8.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-table-api-scala-bridge_2.11</artifactid>
  <version>1.8.0</version>
</dependency>

在内部,表生态系统的一部分是在scala中实现的。 因此,请确保为批处理和流应用程序添加以下依赖项:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-streaming-scala_2.11</artifactid>
  <version>1.8.0</version>
</dependency>

2.4 扩展依赖

如果要实现与kafka或一组用户定义函数交互的自定义格式,以下依赖关系就足够了,可用于sql客户端的jar文件:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-table-common</artifactid>
  <version>1.8.0</version>
</dependency>

目前,该模块包括以下扩展点:

  • serializationschemafactory
  • deserializationschemafactory
  • scalarfunction
  • tablefunction
  • aggregatefunction

3 概念和通用api

table api和sql集成在一个联合api中。此api的核心概念是table用作查询的输入和输出。本文档显示了具有 table api和sql查询的程序的常见结构,如何注册table,如何查询table以及如何发出table。

3.1 table api和sql程序的结构

批处理和流式传输的所有 table api和sql程序都遵循相同的模式。以下代码示例显示了 table api和sql程序的常见结构。

// 对于批处理程序,使用executionenvironment而不是streamexecutionenvironment
streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

// 创建一个tableenvironment
// 对于批处理程序使用batchtableenvironment而不是streamtableenvironment
streamtableenvironment tableenv = streamtableenvironment.create(env);

// 注册一个 table
tableenv.registertable("table1", ...)            // 或者
tableenv.registertablesource("table2", ...);     // 或者
tableenv.registerexternalcatalog("extcat", ...);
// 注册一个输出 table
tableenv.registertablesink("outputtable", ...);

/ 从 table api query 创建一个table
table tapiresult = tableenv.scan("table1").select(...);
// 从 sql query 创建一个table
table sqlresult  = tableenv.sqlquery("select ... from table2 ... ");

// 将表api结果表发送到tablesink,对于sql结果也是如此
tapiresult.insertinto("outputtable");

// 执行
env.execute();

3.2 将datastream或dataset转换为表

它也可以直接转换为a 而不是注册a datastream或datasetin 。如果要在 table api查询中使用table,这很方便。tableenvironmenttable

// 获取streamtableenvironment
//在batchtableenvironment中注册dataset是等效的
streamtableenvironment tableenv = streamtableenvironment.create(env);

datastream<tuple2<long, string>> stream = ...

// 将datastream转换为默认字段为“f0”,“f1”的表
table table1 = tableenv.fromdatastream(stream);

// 将datastream转换为包含字段“mylong”,“mystring”的表
table table2 = tableenv.fromdatastream(stream, "mylong, mystring");
  • sale.csv文件
    Flink实战(六) - Table API & SQL编程
  • scala
    Flink实战(六) - Table API & SQL编程
  • java
    Flink实战(六) - Table API & SQL编程

还不完善,等日后flink该模块开发完毕再深入研究!

参考

table api & sql