欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink结合Iceberg的一种实现方式笔记

程序员文章站 2022-03-08 08:10:07
...

前言

当前Iceberg仅支持flink 1.11.x 使用Datastream和Table API写iceberg表,鉴于hive catalog 的测试暂时未通过 参考ISSUE
故以下使用hadoop catalog记录下过程,后面测试完成后再行补充。

提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是Iceberg?

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

Iceberg作为一个数据湖解决方案它 支持 ACID 事务、(基于spark.2.4.5+以上)的修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,
同时,为了支持流式数据的写入,引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink的终表解决方案。
当然了,iceberg并不支持row level  update,仅支持insert into/overwrite, iceberg团队在未来将致力于解决这个问题

在当前的flink iceberg集成工作中,有一些特性当前还不支持,如下所示:

  • 不支持创建隐士分区
  • 不支持创建包含计算列的iceberg表
  • 不支持创建带水印的iceberg表
  • 不支持添加列、删除列、重命名列、更改列
  • 不支持flink使用流模式读iceberg表

 

Feature support Flink 1.11.0 Notes
SQL create catalog ✔️  
SQL create database ✔️  
SQL create table ✔️  
SQL alter table ✔️ Only support altering table properties(仅限于hive catalog表), Columns/PartitionKey changes are not supported now
SQL drop_table ✔️  
SQL select ✔️ Only support batch mode now.
SQL insert into ✔️ ️ Support both streaming and batch mode
SQL insert overwrite ✔️ ️  
DataStream read ✔️ ️  
DataStream append ✔️ ️  
DataStream overwrite ✔️ ️  
Metadata tables

二、Flink CLI 测试读写Iceberg表

1.启动客户端

代码如下(示例):

#flink-1.11.2
./bin/sql-client.sh embedded \
    -j /data/flink-1.11.2/lib/iceberg-flink-runtime-0.10.0.jar \
    -j /data/flink-1.11.2/lib/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar \
    shell

2.DDL测试 

代码如下(示例):

-- 1. 创建hadoop_catalog
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nameservice1/data/iceberg',
  'property-version'='1'
);
[INFO] Catalog has been created.

-- 2. 创建database
Flink SQL> CREATE DATABASE iceberg_db;
[INFO] Database has been created.

Flink SQL> use iceberg_db;


-- 3.创建非分区表和分区表
Flink SQL> CREATE TABLE sample_iceberg (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> );
[INFO] Table has been created.


Flink SQL> CREATE TABLE sample_iceberg_partition (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> ) PARTITIONED BY (data);
[INFO] Table has been created.


insert into sample_iceberg values (1,'test1');
insert into sample_iceberg values (2,'test2');

INSERT into sample_iceberg_partition PARTITION(data='city') SELECT 86;

-- 4.
Flink cli query ...


-- 5. 查看存储结构
[bigdata03:flink-1.11.2]17:16:22$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/metadata
Found 8 items
-rw-r--r--   2 app_prd supergroup          1 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/version-hint.text
-rw-r--r--   2 app_prd supergroup       2808 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v3.metadata.json
-rw-r--r--   2 app_prd supergroup       1795 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v2.metadata.json
-rw-r--r--   2 app_prd supergroup        816 2021-01-07 16:36 /data/iceberg/iceberg_db/sample_iceberg/metadata/v1.metadata.json
-rw-r--r--   2 app_prd supergroup       3020 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-8264611366906128313-1-b4da00c0-19cc-45da-a386-e13d2a63bb92.avro
-rw-r--r--   2 app_prd supergroup       3092 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-3280093718175212361-1-d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e.avro
-rw-r--r--   2 app_prd supergroup       4580 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e-m0.avro
-rw-r--r--   2 app_prd supergroup       4582 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/b4da00c0-19cc-45da-a386-e13d2a63bb92-m0.avro
[bigdata03:flink-1.11.2]17:16:34$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/data
Found 2 items
-rw-r--r--   2 app_prd supergroup        664 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-548654c2-77dd-46a5-827a-9a207df1e4f6-00001.parquet
-rw-r--r--   2 app_prd supergroup        665 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-42ab18d5-5d71-4efa-9eea-2438cfc4b8f7-00001.parquet


 

 

 

三、使用编程SQL方式读写Iceberg表

1. 依赖添加

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime</artifactId>
            <version>0.10.0</version>
        </dependency>

2. 部分代码实现

// 使用table api 创建 hadoop catalog
 TableResult tableResult = tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
                "  'type'='iceberg',\n" +
                "  'catalog-type'='hadoop',\n" +
                "  'warehouse'='hdfs://nameservice1/tmp',\n" +
                "  'property-version'='1'\n" +
                ")");

        // 使用catalog
        tenv.useCatalog("hadoop_catalog");
        // 创建库
        tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
        tenv.useDatabase("iceberg_hadoop_db");

     
        // 创建iceberg 结果表
        tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
        tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_001 (\n" +
                "    id BIGINT COMMENT 'unique id',\n" +
                "    data STRING\n" +
                ")");

        // 测试写入
        tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_001 select 100,'abc'");

 

3. 创建hive的外部表来实时查询iceberg表
 

hive> add jar /tmp/iceberg-hive-runtime-0.10.0.jar;

hive> CREATE EXTERNAL TABLE tmp.iceberg_001(id bigint,data string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION '/tmp/iceberg_hadoop_db/iceberg_001';


hive> select * from tmp.iceberg_001;
OK
100		abc
1001	abcd
Time taken: 0.535 seconds, Fetched: 2 row(s)

 


总结

以上再测试过程中发现以下问题:
1、*的流式数据不支持overwrite写入hadoop catalog表

Flink SQL> INSERT OVERWRITE sample_iceberg_partition PARTITION(data='city') SELECT 86;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Unbounded data stream doesn't support overwrite operation.

 

2、暂不支持hadoop catalog更改表名等操作,当前仅支持更新设置属性和删除

Flink SQL>ALTER TABLE sample_iceberg SET ('write.format.default'='avro');
[INFO] Alter table succeeded!

Flink SQL> ALTER TABLE sample_iceberg RENAME TO sample_iceberg_test;
[ERROR] Could not execute SQL statement. Alter table failed! Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables

Flink SQL> CREATE TABLE sample (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> );
[INFO] Table has been created.


Flink SQL> show tables;
sample
sample_iceberg
sample_iceberg_partition

Flink SQL> drop table  sample;
[INFO] Table has been removed.

以上就是我在测试时候遇到的问题,后面尝试使用hive catalog 并且结合kafka消费流式数据在做个补充。
>> 完 <<