欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

数据湖之 Flink Spark 集成 iceberg

程序员文章站 2022-03-08 11:52:44
...

一、iceberg优点

  1. 提供ACID事物,上游数据写入即可见,提供了 upsert、merge into 能力
  2. 支持计算引擎有 Spark、Flink、Presto 以及 Hive
  3. 支持 Parquet、Avro 以及 ORC 等存储格式
  4. 增量读取处理能力:支持通过流式方式读取增量数据,支持 Structed Streaming 以及 Flink table Source

二、安装部署

版本: hadoop 2.7.7 hive 2.3.9 flink 1.11.1 spark 3.0.2 iceberg 0.11.1
下载依赖jar包:

 iceberg-flink-runtime-0.11.1.jar
 iceberg-hive-metastore-0.11.1.jar
 iceberg-hive-runtime-0.11.1.jar
 iceberg-spark3-0.11.1.jar
 iceberg-spark3-runtime-0.11.1.jar
 iceberg-spark3-extensions-0.11.1.jar

根据各自名字分别移到spark , hive . flink lib目录下
加入全局配置: HADOOP_HOME HIVE_HOME .并配置path 环境变量

export PATH= HADOOP_HOME/bin: HIVE_HOME/conf

加入全局配置:

 export HADOOP_CLAASSPATH=$(hadoop classpath)

三、iceberg集成flink

1.为了方便演示,使用flink 的 sql-client.sh conf目录下面 sql-client-defaults.yaml 加入配置;

catalogs:
# 默认 catalog 名字
- name: myhive
# 默认catalog的类型
type: hive
default-database: default
# hive配置文件地址
hive-conf-dir: /opt/server/apache-hive-2.3.9-bin/conf

将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面

2.将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面

export HADOOP_CLASSPATH=`hadoop classpath`

3.并启动本地flink集群,bin目录下面执行 :

./sql-client.sh embedded -j /opt/server/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar
-j /opt/server/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.11-1.11.2.jar shell

# 需要在启动参数中添加 -noverify 跳过字节码校验
# # start client without jar
exec $JAVA_RUN $JVM_ARGS -noverify "${log_setting[@]}" -classpath "`manglePathList
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.f
link.table.client.SqlClient "[email protected]"
# check if SQL client jar is in /opt
elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
# start client with jar
exec $JAVA_RUN $JVM_ARGS -noverify "${log_setting[@]}" -classpath "`manglePathList
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIE
NT_JAR"`" org.apache.flink.table.client.SqlClient "[email protected]" --jar "`manglePath
$FLINK_SQL_CLIENT_JAR`"

4.flink集成iceberg

flink 黑窗口中执行sql命令

./sql-client.sh embedded  -j /opt/server/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar -j /opt/server/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.11-1.11.2.jar shell

①.创建catalog 指定iceberg存放位置

CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://xxx:8020/user/hive/warehouse/',
'property-version'='1'
);

use catalog hadoop_catalog; 

②.ddl命令

创建库、创建表

CREATE DATABASE test;

CREATE TABLE hadoop_catalog.test.iceberg_test(
    id BIGINT COMMENT 'unique id',
    data STRING
);

删除表

DROP TABLE hadoop_catalog.test.iceberg_test;

查询 批查询

SET execution.type = batch ;
SELECT * FROM hadoop_catalog.test.iceberg_test;

③.dml命令

insert into

insert into  hadoop_catalog.test.iceberg_testvalues (1,'a'),(2,'b'),(3,'c');
insert into iceberg_test select id,name from test.iceberg_test;

insert overwrite

SET execution.type = batch ;
insert overwrite hadoop_catalog.test.iceberg_test values (2,'wangwu');

drop

drop table hadoop_catalog.test.iceberg_test;

flink sql 尚不支持 delete update语句,需要调用icebetg java api执行 删除.修改操作,

delete from test.iceberg_test where id=3;
update test.iceberg_test set data='lala' where id=3;

也不支持 添加列 删除列 重命名列 修改列

alter table test.iceberg_test add columns flag int;

④flink2iceberg 代码

package com.huohua.flink2iceberg;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;


public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
        System.out.println("---> 1. create iceberg hadoop catalog table  .... ");
        // create hadoop catalog
        tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
                "  'type'='iceberg',\n" +
                "  'catalog-type'='hadoop',\n" +
                "  'warehouse'='hdfs://xxx:8020/user/hive/warehouse',\n" +
                "  'property-version'='1'\n" +
                ")");

        // 使用catalog
        tableEnv.useCatalog("hadoop_catalog");
        // 创建库
        tableEnv.executeSql("CREATE DATABASE if not exists hadoop_catalog.iceberg_hadoop_test");
        tableEnv.useDatabase("iceberg_hadoop_test");
        // 创建iceberg 结果表
        tableEnv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
        TableResult tableResult1 = tableEnv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_test.iceberg_001 (\n" +
                "    id BIGINT COMMENT 'unique id',\n" +
                "    data STRING\n" +
                ")");
        tableResult1.print();
        // 测试写入
        tableEnv.executeSql("insert into hadoop_catalog.iceberg_hadoop_test.iceberg_001 select 100,'abc'");


        System.out.println("---> 2. create kafka Stream table  .... ");
        String HIVE_CATALOG = "myhive";
        String DEFAULT_DATABASE = "tmp";
        String HIVE_CONF_DIR = "/xx/resources";
//        Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
        Configuration conf = new Configuration();
        conf.addResource(HIVE_CATALOG);
        conf.addResource(DEFAULT_DATABASE);
        conf.addResource(HIVE_CONF_DIR);
        Catalog catalog = new HiveCatalog(conf);
        assert false;
        tableEnv.registerCatalog(HIVE_CATALOG, (org.apache.flink.table.catalog.Catalog) catalog);
        tableEnv.useCatalog("myhive");
        // create kafka stream table
        tableEnv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
        tableEnv.executeSql(
                "CREATE TABLE ods_k_2_iceberg (\n" +
                        " user_id STRING,\n" +
                        " order_amount DOUBLE,\n" +
                        " log_ts TIMESTAMP(3),\n" +
                        " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +
                        ") WITH (\n" +
                        "  'connector'='kafka',\n" +
                        "  'topic'='t_kafka_03',\n" +
                        "  'scan.startup.mode'='latest-offset',\n" +
                        "  'properties.bootstrap.servers'='xx:9092',\n" +
                        "  'properties.group.id' = 'testGroup_01',\n" +
                        "  'format'='json'\n" +
                        ")");


        System.out.println("---> 3. insert into iceberg  table from kafka stream table .... ");
        tableEnv.executeSql(
                "INSERT INTO  hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
                        " SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");
        env.execute();
    }
}

⑤reading iceberg

package com.huohua.flink2iceberg;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;

public class ReadIceberg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://xxx:8020/user/hive/warehouse/test/iceberg_test");
        DataStream<RowData> batch = FlinkSource.forRowData()
                .env(env)
                .tableLoader(tableLoader)
                .streaming(false)
                .build();

// Print all records to stdout.
        batch.print();

// Submit and execute this batch read job.
        env.execute("Test Iceberg Batch Read");
    }
}

目前有两种方式可以实现:

调用iceberg的Java API来实现DDL更新,并集成到自己的数据平台上。
用本地的spark-sql做为iceberg表的客户端,然后通过spark-sql来修改DDL。

flink写入iceberg的相关文章:
flink写入iceberg。
从hive读取iceberg数据。
flink实时写入iceberg。

四、hive查询iceberg表

  1. 将 iceberg-hive-metastore-0.11.1.jar iceberg-hive-runtime-0.11.1.jar copy至lib目录下
  2. 创建映射表
CREATE EXTERNAL TABLE test.iceberg_hive_1(
`id` int,
`data` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://xxx:8020/user/hive/warehouse/test/iceberg_test'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
-- 这个location 一定要精确到iceberg的表级别
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://xxx:8020/user/hive/warehouse/t
est/iceberg_test'
);

3.执行查询

select * from test.iceberg_test;

insert . update 操作暂不支持

五、spark集成iceberg表

需求描述:spark3.0 读取hive离线数仓的历史数据,insert into/overwrite 写入iceberg中。

使用spark-sql和spark-shell命令行操作

1、将 iceberg-spark3-0.11.1.jar iceberg-spark3-extensions-0.11.1.jar iceberg-spark3-runtime-0.11.1.jar 拷贝
到 jars 目录下

2、将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面\

3、配置spark-defaults.conf

# 设置hive元数据地址
spark.hadoop.hive.metastore.uris thrift://localhost:9083
spark.hadoop.hive.metastore.warehouse.dir hdfs://xxx:8020/user/hive/warehouse

#设置默认的catalog 这里默认catalog为hadoop类型的 hadoop_catalog
spark.sql.catalog.hadoop_catalog = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_catalog.type = hadoop
spark.sql.catalog.hadoop_catalog.warehouse = hdfs://xxx:8020/user/hive/warehouse/

spark.sql.warehouse.dir = hdfs://xxx:8020/user/hive/warehouse/

4、hive创建表

-- 建hive数据库
create database hive_test;

-- 建hive表
create table hive_test.iceberg_test(id bigint,name string);

-- 插入数据
insert into table hive_test.iceberg_test values(1,"zhangsan"),(2,"lisi"),(3,"wangwu");

-- 查询数据
select * from hive_test.iceberg_test;
OK
1       zhangsan
2       lisi
3       wangwu
Time taken: 0.074 seconds, Fetched: 3 row(s)

5.登录shell客户端

bin./spark-shell 

-- 创建iceberg表
spark.sql("create table test.iceberg_test (id bigint,name string)").show()

-- 查询hive hive_test.iceberg_test 数据,插入iceberg test.iceberg_test 表中
spark.sql("insert into table test.iceberg_test select * from hive_test.iceberg_test").show()

spark.sql("insert overwrite table test.iceberg_test select * from hive_test.iceberg_test").show()

-- 查询iceberg test.iceberg_test表,已存在数据
scala> spark.sql("select * from test.iceberg_test").show()
+---+--------+
| id|    name|
+---+--------+
|  1|zhangsan|
|  2|    lisi|
|  3|  wangwu|
+---+--------+

6.其他操作iceberg命令

– 获取表结构信息

spark.table("hadoop_catalog.test.spark_test_table").printSchema()

– 读取指定的表数据,这个表是已经存在的

spark.read.format("iceberg").table("hadoop_catalog.test.spark_test_table").show()

spark.sql("select * from hadoop_catalog.test.spark_test_table").show()

– 读取指定快照下的数据

spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show

未完待续!!!