欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在 Flink 中使用 iceberg

程序员文章站 2022-07-14 12:22:07
...

Apache Iceberg 支持Apache Flink的DataStream API 和 Table API 将记录写入 iceberg 的表,当前,我们只提供 iceberg 与 apache flink 1.11.x  的集成支持。

Feature support Flink 1.11.0 Notes
SQL create catalog ✔️
SQL create database ✔️
SQL create table ✔️
SQL alter table ✔️ Only support altering table properties, Columns/PartitionKey changes are not supported now
SQL drop_table ✔️
SQL select ✔️ Only support batch mode now.
SQL insert into ✔️ ️ Support both streaming and batch mode
SQL insert overwrite ✔️ ️
DataStream read ✔️ ️
DataStream append ✔️ ️
DataStream overwrite ✔️ ️
Metadata tables

准备

为了在 Flink 中创建 iceberg 表,我们推荐使用Flink SQL Client以便用户更容易理解概念。

第一步,在 apache flinkdownload page页面下载 flink 1.11.x binary 包,我们当前使用 scala 2.12 来实现 apache iceberg-flink-runtime jar,因此推荐使用绑定 scala 2.12 的 flink 1.11。

wget https://downloads.apache.org/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.12.tgz tar xzvf flink-1.11.1-bin-scala_2.12.tgz

第二步,启动一个基于 Hadoop 环境 的 standalone 模式的 flink 集群。

# HADOOP_HOME is your hadoop root directory after unpack the binary package. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the flink standalone cluster
./bin/start-cluster.sh

第三步,启动 flink SQL client。

我们已经在 iceberg 项目中创建独立的 flink-runtime 模块来生成一个捆绑的 jar, 可以直接被 flink SQL client加载。

如果我们要手动构建 flink-runtime 的 jar,那么只需要构建一个 iceberg 工程,它就会在 /flink-runtime/build/libs 下生成 jar。当然,我们也可以从 apache 官方仓库下载 flink-runtime jar。

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell

默认情况下, iceberg 已经包含 hadoop catalog 的 hadoop jars。如果要使用 hive catalog,在打开 flink sql 客户端时,需要加载 hive jars。幸运的是,apache flink 为 sql 客户端提供了一个捆绑的 hive jar。因此,我们可以按如下方式打开 sql 客户端:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# wget the flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar from the above bundled jar URL firstly.
# open the SQL client.
./bin/sql-client.sh embedded \
    -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar \
    -j <hive-bundlded-jar-directory>/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \
    shell

创建 catalogs 和使用 catalogs

Flink 1.11 支持使用 flink sql创建 catalogs。

Hive catalog

创建一个名为hive_catalog的 iceberg catalog ,用来从 hive metastore 中加载表。

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);
  • type: 只能使用iceberg,用于 iceberg 表格式。(必须)
  • catalog-type: Iceberg 当前支持hivehadoopcatalog 类型。(必须)
  • uri: Hive metastore 的 thrift URI。 (必须)
  • clients: Hive metastore 客户端池大小,默认值为 2。 (可选)
  • property-version: 版本号来描述属性版本。此属性可用于在属性格式发生更改时进行向后兼容。当前的属性版本是 1。(可选)
  • warehouse: Hive 仓库位置, 如果既不将 hive-conf-dir 设置为指定包含 hive-site.xml 配置文件的位置,也不将正确的 hive-site.xml 添加到类路径,则用户应指定此路径。
  • hive-conf-dir: 包含 Hive-site.xml 配置文件的目录的路径,该配置文件将用于提供自定义的 Hive 配置值。 如果在创建 iceberg catalog 时同时设置 hive-conf-dir 和 warehouse,那么将使用 warehouse 值覆盖 < hive-conf-dir >/hive-site.xml (或者 classpath 中的 hive 配置文件)中的 hive.metastore.warehouse.dir 的值。

Hadoop catalog

Iceberg 还支持 HDFS 中基于目录的 catalog ,可以使用’catalog-type’='hadoop’进行配置:

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nn:8020/warehouse/path',
  'property-version'='1'
);
  • warehouse: 用于存储元数据文件和数据文件的 HDFS 目录。(必须)

我们可以执行 sql 命令 USE CATALOG hive _ CATALOG 来设置当前 catalog 。

Custom catalog

通过指定 Catalog-impl 属性,Flink 还支持加载自定义的 Iceberg Catalog 实现。当设置了 catalog-impl 时,将忽略 catalog-type 的值。下面是一个例子:

CREATE CATALOG my_catalog WITH (
  'type'='iceberg',
  'catalog-impl'='com.my.custom.CatalogImpl',
  'my-additional-catalog-config'='my-value'
);

DDL 命令

创建数据库

默认情况下,iceberg 在 flink 中使用 default 数据库。如果我们不想在 default 数据库下创建表,可以使用下面的例子来创建一个单独的数据库:

CREATE DATABASE iceberg_db;
USE iceberg_db;

创建表

CREATE TABLE hive_catalog.default.sample (
    id BIGINT COMMENT 'unique id',
    data STRING
);

表创建命令支持最常用的 flink create 子句,包括:

  • PARTITION BY (column1, column2, ...)配置分区,apache flik 还不支持隐藏分区。
  • COMMENT 'table document'设置一个表描述。
  • WITH ('key'='value', ...)设置将存储在 apache iceberg 表属性中的表配置。

目前,它不支持计算列、主键和水印定义等。

PARTITIONED BY 分区

要创建分区表,使用 PARTITIONED BY:

CREATE TABLE hive_catalog.default.sample (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg 支持隐藏分区,但是 Apache flink 不支持按列上的函数进行分区,所以我们现在没有办法在 flink DDL 中支持隐藏分区,我们将在未来改进 Apache flink DDL。

ALTER TABLE 更改表

Iceberg 现在只支持在 flink 1.11中修改表属性。

ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')

ALTER TABLE .. RENAME TO

ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;

DROP TABLE 删除表

要删除表,运行:

DROP TABLE hive_catalog.default.sample;

SQL 查询

Iceberg 现在不支持 flink 流式读取,它仍在开发中。但是它支持批量读取以扫描 iceberg 表中的现有记录。

-- Execute the flink job in streaming mode for current session context
SET execution.type = batch ;
SELECT * FROM sample       ;

注意: 我们可以执行以下 sql 命令将执行类型从‘ streaming’模式切换到‘ batch’模式,反之亦然:

-- Execute the flink job in streaming mode for current session context
SET execution.type = streaming
-- Execute the flink job in batch mode for current session context
SET execution.type = batch

使用 SQL 编写

Iceberg 在 flink 1.11 中支持 INSERT INTO 和 INSERT OVERWRITE。

INSERT INTO

flink 流作业将新数据追加到表中,使用 INSERT INTO:

INSERT INTO hive_catalog.default.sample VALUES (1, 'a');
INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;

INSERT OVERWRITE

要使用查询结果替换表中的数据,请在批作业中使用 INSERT OVERWRITE (flink 流作业不支持 INSERT OVERWRITE)。覆盖是 Iceberg 表的原子操作。

具有由 SELECT 查询生成的行的分区将被替换,例如:

INSERT OVERWRITE sample VALUES (1, 'a');

Iceberg 还支持通过 select 值覆盖给定的分区:

INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;

对于分区的 Iceberg 表,当所有分区列都在 PARTITION 子句中设置一个值时,它将插入到一个静态分区中,否则,如果部分分区列(所有分区列的前缀部分)在 PARTITION 子句中设置一个值,它将把查询结果写入一个动态分区中。对于未分区的 Iceberg 表,其数据将被 INSERT OVERWRITE 完全覆盖。

DataStream 读数据

Iceberg 现在不支持流式或批量读取,但它正在开发中。

DataStream 写数据

Iceberg 支持从不同的 DataStream 输入写入 Iceberg 表。

Appending data 追加数据

我们支持在本地编写 DataStream < rowdata > 和 DataStream < Row> 到 sink iceberg 表。

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .hadoopConf(hadoopConf)
    .build();
env.execute("Test Iceberg DataStream");

Iceberg API 还允许用户将通用的 DataStream < T > 写到 iceberg 表中,在这个单元测试中可以找到更多的例子。

Overwrite data 重写数据

为了动态覆盖现有 Iceberg 表中的数据,我们可以在FlinkSink构建器中设置overwrite标志。

StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .overwrite(true)
    .hadoopConf(hadoopConf)
    .build();
env.execute("Test Iceberg DataStream");

检查表

Iceberg 现在还不支持在 flink sql 中 检查表,需要使用iceberg’s Java API读取 Iceberg 的元数据来获取表信息。

未来的改进

在目前的 flink iceberg 集成工作中,有一些特性我们还不支持:

  • 不支持创建带有隐藏分区的 iceberg 表。Discussion在flink 邮件列表中讨论。
  • 不支持使用经过计算的列创建 iceberg 表。
  • 不支持创建带水印的 iceberg 表。
  • 不支持添加列、删除列、重命名列、更改列。FLINK-19062正在追踪这个这个开发计划。
  • 不支持在 flink 流模式下读冰山表。#1383 正在追踪这个这个开发计划。