在 Flink 中使用 iceberg
Apache Iceberg 支持Apache Flink的DataStream API 和 Table API 将记录写入 iceberg 的表,当前,我们只提供 iceberg 与 apache flink 1.11.x 的集成支持。
Feature support | Flink 1.11.0 | Notes |
---|---|---|
SQL create catalog | ✔️ | |
SQL create database | ✔️ | |
SQL create table | ✔️ | |
SQL alter table | ✔️ | Only support altering table properties, Columns/PartitionKey changes are not supported now |
SQL drop_table | ✔️ | |
SQL select | ✔️ | Only support batch mode now. |
SQL insert into | ✔️ ️ | Support both streaming and batch mode |
SQL insert overwrite | ✔️ ️ | |
DataStream read | ✔️ ️ | |
DataStream append | ✔️ ️ | |
DataStream overwrite | ✔️ ️ | |
Metadata tables | ️ |
准备
为了在 Flink 中创建 iceberg 表,我们推荐使用Flink SQL Client以便用户更容易理解概念。
第一步,在 apache flinkdownload page页面下载 flink 1.11.x binary 包,我们当前使用 scala 2.12 来实现 apache iceberg-flink-runtime jar,因此推荐使用绑定 scala 2.12 的 flink 1.11。
wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz tar xzvf flink-1.11.1-bin-scala_2.12.tgz
第二步,启动一个基于 Hadoop 环境 的 standalone 模式的 flink 集群。
# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the flink standalone cluster
./bin/start-cluster.sh
第三步,启动 flink SQL client。
我们已经在 iceberg 项目中创建独立的 flink-runtime 模块来生成一个捆绑的 jar, 可以直接被 flink SQL client加载。
如果我们要手动构建 flink-runtime 的 jar,那么只需要构建一个 iceberg 工程,它就会在 /flink-runtime/build/libs 下生成 jar。当然,我们也可以从 apache 官方仓库下载 flink-runtime jar。
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell
默认情况下, iceberg 已经包含 hadoop catalog 的 hadoop jars。如果要使用 hive catalog,在打开 flink sql 客户端时,需要加载 hive jars。幸运的是,apache flink 为 sql 客户端提供了一个捆绑的 hive jar。因此,我们可以按如下方式打开 sql 客户端:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# wget the flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar from the above bundled jar URL firstly.
# open the SQL client.
./bin/sql-client.sh embedded \
-j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar \
-j <hive-bundlded-jar-directory>/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \
shell
创建 catalogs 和使用 catalogs
Flink 1.11 支持使用 flink sql创建 catalogs。
Hive catalog
创建一个名为hive_catalog
的 iceberg catalog ,用来从 hive metastore 中加载表。
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://nn:8020/warehouse/path'
);
-
type
: 只能使用iceberg,
用于 iceberg 表格式。(必须) -
catalog-type
: Iceberg 当前支持hive
或hadoop
catalog 类型。(必须) -
uri
: Hive metastore 的 thrift URI。 (必须) -
clients
: Hive metastore 客户端池大小,默认值为 2。 (可选) -
property-version
: 版本号来描述属性版本。此属性可用于在属性格式发生更改时进行向后兼容。当前的属性版本是 1。(可选) -
warehouse
: Hive 仓库位置, 如果既不将 hive-conf-dir 设置为指定包含 hive-site.xml 配置文件的位置,也不将正确的 hive-site.xml 添加到类路径,则用户应指定此路径。 -
hive-conf-dir
: 包含 Hive-site.xml 配置文件的目录的路径,该配置文件将用于提供自定义的 Hive 配置值。 如果在创建 iceberg catalog 时同时设置 hive-conf-dir 和 warehouse,那么将使用 warehouse 值覆盖 < hive-conf-dir >/hive-site.xml (或者 classpath 中的 hive 配置文件)中的 hive.metastore.warehouse.dir 的值。
Hadoop catalog
Iceberg 还支持 HDFS 中基于目录的 catalog ,可以使用’catalog-type’='hadoop’进行配置:
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://nn:8020/warehouse/path',
'property-version'='1'
);
-
warehouse
: 用于存储元数据文件和数据文件的 HDFS 目录。(必须)
我们可以执行 sql 命令 USE CATALOG hive _ CATALOG 来设置当前 catalog 。
Custom catalog
通过指定 Catalog-impl 属性,Flink 还支持加载自定义的 Iceberg Catalog 实现。当设置了 catalog-impl 时,将忽略 catalog-type 的值。下面是一个例子:
CREATE CATALOG my_catalog WITH (
'type'='iceberg',
'catalog-impl'='com.my.custom.CatalogImpl',
'my-additional-catalog-config'='my-value'
);
DDL 命令
创建数据库
默认情况下,iceberg 在 flink 中使用 default 数据库。如果我们不想在 default 数据库下创建表,可以使用下面的例子来创建一个单独的数据库:
CREATE DATABASE iceberg_db;
USE iceberg_db;
创建表
CREATE TABLE hive_catalog.default.sample (
id BIGINT COMMENT 'unique id',
data STRING
);
表创建命令支持最常用的 flink create 子句,包括:
-
PARTITION BY (column1, column2, ...)
配置分区,apache flik 还不支持隐藏分区。 -
COMMENT 'table document'
设置一个表描述。 -
WITH ('key'='value', ...)
设置将存储在 apache iceberg 表属性中的表配置。
目前,它不支持计算列、主键和水印定义等。
PARTITIONED BY 分区
要创建分区表,使用 PARTITIONED BY:
CREATE TABLE hive_catalog.default.sample (
id BIGINT COMMENT 'unique id',
data STRING
) PARTITIONED BY (data);
Apache Iceberg 支持隐藏分区,但是 Apache flink 不支持按列上的函数进行分区,所以我们现在没有办法在 flink DDL 中支持隐藏分区,我们将在未来改进 Apache flink DDL。
ALTER TABLE 更改表
Iceberg 现在只支持在 flink 1.11中修改表属性。
ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
ALTER TABLE .. RENAME TO
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
DROP TABLE 删除表
要删除表,运行:
DROP TABLE hive_catalog.default.sample;
SQL 查询
Iceberg 现在不支持 flink 流式读取,它仍在开发中。但是它支持批量读取以扫描 iceberg 表中的现有记录。
-- Execute the flink job in streaming mode for current session context
SET execution.type = batch ;
SELECT * FROM sample ;
注意: 我们可以执行以下 sql 命令将执行类型从‘ streaming’模式切换到‘ batch’模式,反之亦然:
-- Execute the flink job in streaming mode for current session context
SET execution.type = streaming
-- Execute the flink job in batch mode for current session context
SET execution.type = batch
使用 SQL 编写
Iceberg 在 flink 1.11 中支持 INSERT INTO 和 INSERT OVERWRITE。
INSERT INTO
flink 流作业将新数据追加到表中,使用 INSERT INTO:
INSERT INTO hive_catalog.default.sample VALUES (1, 'a');
INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
INSERT OVERWRITE
要使用查询结果替换表中的数据,请在批作业中使用 INSERT OVERWRITE (flink 流作业不支持 INSERT OVERWRITE)。覆盖是 Iceberg 表的原子操作。
具有由 SELECT 查询生成的行的分区将被替换,例如:
INSERT OVERWRITE sample VALUES (1, 'a');
Iceberg 还支持通过 select 值覆盖给定的分区:
INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
对于分区的 Iceberg 表,当所有分区列都在 PARTITION 子句中设置一个值时,它将插入到一个静态分区中,否则,如果部分分区列(所有分区列的前缀部分)在 PARTITION 子句中设置一个值,它将把查询结果写入一个动态分区中。对于未分区的 Iceberg 表,其数据将被 INSERT OVERWRITE 完全覆盖。
DataStream 读数据
Iceberg 现在不支持流式或批量读取,但它正在开发中。
DataStream 写数据
Iceberg 支持从不同的 DataStream 输入写入 Iceberg 表。
Appending data 追加数据
我们支持在本地编写 DataStream < rowdata > 和 DataStream < Row> 到 sink iceberg 表。
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");
Iceberg API 还允许用户将通用的 DataStream < T > 写到 iceberg 表中,在这个单元测试中可以找到更多的例子。
Overwrite data 重写数据
为了动态覆盖现有 Iceberg 表中的数据,我们可以在FlinkSink构建器中设置overwrite标志。
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");
检查表
Iceberg 现在还不支持在 flink sql 中 检查表,需要使用iceberg’s Java API读取 Iceberg 的元数据来获取表信息。
未来的改进
在目前的 flink iceberg 集成工作中,有一些特性我们还不支持:
- 不支持创建带有隐藏分区的 iceberg 表。Discussion在flink 邮件列表中讨论。
- 不支持使用经过计算的列创建 iceberg 表。
- 不支持创建带水印的 iceberg 表。
- 不支持添加列、删除列、重命名列、更改列。FLINK-19062正在追踪这个这个开发计划。
- 不支持在 flink 流模式下读冰山表。#1383 正在追踪这个这个开发计划。
上一篇: 九章算法 | 拼多多面试题:前三大数
推荐阅读
-
在Python中定义和使用抽象类的方法
-
在ASP.NET 2.0中操作数据之四:使用ObjectDataSource展现数据
-
在ASP.NET 2.0中操作数据之十:使用 GridView和DetailView实现的主/从报表
-
在ASP.NET 2.0中操作数据之十二:在GridView控件中使用TemplateField
-
在ASP.NET 2.0中操作数据之十三:在DetailsView控件中使用TemplateField
-
在ASP.NET 2.0中操作数据之十四:使用FormView 的模板
-
在ASP.NET 2.0中操作数据之三十三:基于DataList和Repeater使用DropDownList过滤的主/从报表
-
在ASP.NET 2.0中操作数据之三十五:使用Repeater和DataList单页面实现主/从报表
-
java 在Jetty9中使用HttpSessionListener和Filter
-
在ASP.NET 2.0中操作数据之三十一:使用DataList来一行显示多条记录