Flink结合Iceberg的一种实现方式笔记
前言
当前Iceberg仅支持flink 1.11.x 使用Datastream和Table API写iceberg表,鉴于hive catalog 的测试暂时未通过 参考ISSUE,
故以下使用hadoop catalog记录下过程,后面测试完成后再行补充。
提示:以下是本篇文章正文内容,下面案例可供参考
一、什么是Iceberg?
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
Iceberg作为一个数据湖解决方案它 支持 ACID 事务、(基于spark.2.4.5+以上)的修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,
同时,为了支持流式数据的写入,引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink的终表解决方案。
当然了,iceberg并不支持row level update,仅支持insert into/overwrite, iceberg团队在未来将致力于解决这个问题
在当前的flink iceberg集成工作中,有一些特性当前还不支持,如下所示:
- 不支持创建隐士分区
- 不支持创建包含计算列的iceberg表
- 不支持创建带水印的iceberg表
- 不支持添加列、删除列、重命名列、更改列
- 不支持flink使用流模式读iceberg表
Feature support | Flink 1.11.0 | Notes |
---|---|---|
SQL create catalog | ✔️ | |
SQL create database | ✔️ | |
SQL create table | ✔️ | |
SQL alter table | ✔️ | Only support altering table properties(仅限于hive catalog表), Columns/PartitionKey changes are not supported now |
SQL drop_table | ✔️ | |
SQL select | ✔️ | Only support batch mode now. |
SQL insert into | ✔️ ️ | Support both streaming and batch mode |
SQL insert overwrite | ✔️ ️ | |
DataStream read | ✔️ ️ | |
DataStream append | ✔️ ️ | |
DataStream overwrite | ✔️ ️ | |
Metadata tables | ️ |
二、Flink CLI 测试读写Iceberg表
1.启动客户端
代码如下(示例):
#flink-1.11.2
./bin/sql-client.sh embedded \
-j /data/flink-1.11.2/lib/iceberg-flink-runtime-0.10.0.jar \
-j /data/flink-1.11.2/lib/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar \
shell
2.DDL测试
代码如下(示例):
-- 1. 创建hadoop_catalog
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://nameservice1/data/iceberg',
'property-version'='1'
);
[INFO] Catalog has been created.
-- 2. 创建database
Flink SQL> CREATE DATABASE iceberg_db;
[INFO] Database has been created.
Flink SQL> use iceberg_db;
-- 3.创建非分区表和分区表
Flink SQL> CREATE TABLE sample_iceberg (
> id BIGINT COMMENT 'unique id',
> data STRING
> );
[INFO] Table has been created.
Flink SQL> CREATE TABLE sample_iceberg_partition (
> id BIGINT COMMENT 'unique id',
> data STRING
> ) PARTITIONED BY (data);
[INFO] Table has been created.
insert into sample_iceberg values (1,'test1');
insert into sample_iceberg values (2,'test2');
INSERT into sample_iceberg_partition PARTITION(data='city') SELECT 86;
-- 4.
Flink cli query ...
-- 5. 查看存储结构
[bigdata03:flink-1.11.2]17:16:22$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/metadata
Found 8 items
-rw-r--r-- 2 app_prd supergroup 1 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/version-hint.text
-rw-r--r-- 2 app_prd supergroup 2808 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v3.metadata.json
-rw-r--r-- 2 app_prd supergroup 1795 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v2.metadata.json
-rw-r--r-- 2 app_prd supergroup 816 2021-01-07 16:36 /data/iceberg/iceberg_db/sample_iceberg/metadata/v1.metadata.json
-rw-r--r-- 2 app_prd supergroup 3020 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-8264611366906128313-1-b4da00c0-19cc-45da-a386-e13d2a63bb92.avro
-rw-r--r-- 2 app_prd supergroup 3092 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-3280093718175212361-1-d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e.avro
-rw-r--r-- 2 app_prd supergroup 4580 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e-m0.avro
-rw-r--r-- 2 app_prd supergroup 4582 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/b4da00c0-19cc-45da-a386-e13d2a63bb92-m0.avro
[bigdata03:flink-1.11.2]17:16:34$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/data
Found 2 items
-rw-r--r-- 2 app_prd supergroup 664 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-548654c2-77dd-46a5-827a-9a207df1e4f6-00001.parquet
-rw-r--r-- 2 app_prd supergroup 665 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-42ab18d5-5d71-4efa-9eea-2438cfc4b8f7-00001.parquet
三、使用编程SQL方式读写Iceberg表
1. 依赖添加
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.10.0</version>
</dependency>
2. 部分代码实现
// 使用table api 创建 hadoop catalog
TableResult tableResult = tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hadoop',\n" +
" 'warehouse'='hdfs://nameservice1/tmp',\n" +
" 'property-version'='1'\n" +
")");
// 使用catalog
tenv.useCatalog("hadoop_catalog");
// 创建库
tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
tenv.useDatabase("iceberg_hadoop_db");
// 创建iceberg 结果表
tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_001 (\n" +
" id BIGINT COMMENT 'unique id',\n" +
" data STRING\n" +
")");
// 测试写入
tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_001 select 100,'abc'");
3. 创建hive的外部表来实时查询iceberg表
hive> add jar /tmp/iceberg-hive-runtime-0.10.0.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_001(id bigint,data string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/iceberg_hadoop_db/iceberg_001';
hive> select * from tmp.iceberg_001;
OK
100 abc
1001 abcd
Time taken: 0.535 seconds, Fetched: 2 row(s)
总结
以上再测试过程中发现以下问题:
1、*的流式数据不支持overwrite写入hadoop catalog表
Flink SQL> INSERT OVERWRITE sample_iceberg_partition PARTITION(data='city') SELECT 86;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Unbounded data stream doesn't support overwrite operation.
2、暂不支持hadoop catalog更改表名等操作,当前仅支持更新设置属性和删除
Flink SQL>ALTER TABLE sample_iceberg SET ('write.format.default'='avro');
[INFO] Alter table succeeded!
Flink SQL> ALTER TABLE sample_iceberg RENAME TO sample_iceberg_test;
[ERROR] Could not execute SQL statement. Alter table failed! Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables
Flink SQL> CREATE TABLE sample (
> id BIGINT COMMENT 'unique id',
> data STRING
> );
[INFO] Table has been created.
Flink SQL> show tables;
sample
sample_iceberg
sample_iceberg_partition
Flink SQL> drop table sample;
[INFO] Table has been removed.
以上就是我在测试时候遇到的问题,后面尝试使用hive catalog 并且结合kafka消费流式数据在做个补充。
>> 完 <<
上一篇: Gdb调试内核的宏