数据湖之 Flink Spark 集成 iceberg
数据湖之 Flink Spark 集成 iceberg
一、iceberg优点
- 提供ACID事物,上游数据写入即可见,提供了 upsert、merge into 能力
- 支持计算引擎有 Spark、Flink、Presto 以及 Hive
- 支持 Parquet、Avro 以及 ORC 等存储格式
- 增量读取处理能力:支持通过流式方式读取增量数据,支持 Structed Streaming 以及 Flink table Source
二、安装部署
版本: hadoop 2.7.7 hive 2.3.9 flink 1.11.1 spark 3.0.2 iceberg 0.11.1
下载依赖jar包:
iceberg-flink-runtime-0.11.1.jar
iceberg-hive-metastore-0.11.1.jar
iceberg-hive-runtime-0.11.1.jar
iceberg-spark3-0.11.1.jar
iceberg-spark3-runtime-0.11.1.jar
iceberg-spark3-extensions-0.11.1.jar
根据各自名字分别移到spark , hive . flink lib目录下
加入全局配置: HADOOP_HOME HIVE_HOME .并配置path 环境变量
export PATH= HADOOP_HOME/bin: HIVE_HOME/conf
加入全局配置:
export HADOOP_CLAASSPATH=$(hadoop classpath)
三、iceberg集成flink
1.为了方便演示,使用flink 的 sql-client.sh conf目录下面 sql-client-defaults.yaml 加入配置;
catalogs:
# 默认 catalog 名字
- name: myhive
# 默认catalog的类型
type: hive
default-database: default
# hive配置文件地址
hive-conf-dir: /opt/server/apache-hive-2.3.9-bin/conf
将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面
2.将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面
export HADOOP_CLASSPATH=`hadoop classpath`
3.并启动本地flink集群,bin目录下面执行 :
./sql-client.sh embedded -j /opt/server/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar
-j /opt/server/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.11-1.11.2.jar shell
# 需要在启动参数中添加 -noverify 跳过字节码校验
# # start client without jar
exec $JAVA_RUN $JVM_ARGS -noverify "${log_setting[@]}" -classpath "`manglePathList
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.f
link.table.client.SqlClient "[email protected]"
# check if SQL client jar is in /opt
elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
# start client with jar
exec $JAVA_RUN $JVM_ARGS -noverify "${log_setting[@]}" -classpath "`manglePathList
"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIE
NT_JAR"`" org.apache.flink.table.client.SqlClient "[email protected]" --jar "`manglePath
$FLINK_SQL_CLIENT_JAR`"
4.flink集成iceberg
flink 黑窗口中执行sql命令
./sql-client.sh embedded -j /opt/server/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar -j /opt/server/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.11-1.11.2.jar shell
①.创建catalog 指定iceberg存放位置
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://xxx:8020/user/hive/warehouse/',
'property-version'='1'
);
use catalog hadoop_catalog;
②.ddl命令
创建库、创建表
CREATE DATABASE test;
CREATE TABLE hadoop_catalog.test.iceberg_test(
id BIGINT COMMENT 'unique id',
data STRING
);
删除表
DROP TABLE hadoop_catalog.test.iceberg_test;
查询 批查询
SET execution.type = batch ;
SELECT * FROM hadoop_catalog.test.iceberg_test;
③.dml命令
insert into
insert into hadoop_catalog.test.iceberg_testvalues (1,'a'),(2,'b'),(3,'c');
insert into iceberg_test select id,name from test.iceberg_test;
insert overwrite
SET execution.type = batch ;
insert overwrite hadoop_catalog.test.iceberg_test values (2,'wangwu');
drop
drop table hadoop_catalog.test.iceberg_test;
flink sql 尚不支持 delete update语句,需要调用icebetg java api执行 删除.修改操作,
delete from test.iceberg_test where id=3;
update test.iceberg_test set data='lala' where id=3;
也不支持 添加列 删除列 重命名列 修改列
alter table test.iceberg_test add columns flag int;
④flink2iceberg 代码
package com.huohua.flink2iceberg;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
System.out.println("---> 1. create iceberg hadoop catalog table .... ");
// create hadoop catalog
tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hadoop',\n" +
" 'warehouse'='hdfs://xxx:8020/user/hive/warehouse',\n" +
" 'property-version'='1'\n" +
")");
// 使用catalog
tableEnv.useCatalog("hadoop_catalog");
// 创建库
tableEnv.executeSql("CREATE DATABASE if not exists hadoop_catalog.iceberg_hadoop_test");
tableEnv.useDatabase("iceberg_hadoop_test");
// 创建iceberg 结果表
tableEnv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");
TableResult tableResult1 = tableEnv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_test.iceberg_001 (\n" +
" id BIGINT COMMENT 'unique id',\n" +
" data STRING\n" +
")");
tableResult1.print();
// 测试写入
tableEnv.executeSql("insert into hadoop_catalog.iceberg_hadoop_test.iceberg_001 select 100,'abc'");
System.out.println("---> 2. create kafka Stream table .... ");
String HIVE_CATALOG = "myhive";
String DEFAULT_DATABASE = "tmp";
String HIVE_CONF_DIR = "/xx/resources";
// Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
Configuration conf = new Configuration();
conf.addResource(HIVE_CATALOG);
conf.addResource(DEFAULT_DATABASE);
conf.addResource(HIVE_CONF_DIR);
Catalog catalog = new HiveCatalog(conf);
assert false;
tableEnv.registerCatalog(HIVE_CATALOG, (org.apache.flink.table.catalog.Catalog) catalog);
tableEnv.useCatalog("myhive");
// create kafka stream table
tableEnv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
tableEnv.executeSql(
"CREATE TABLE ods_k_2_iceberg (\n" +
" user_id STRING,\n" +
" order_amount DOUBLE,\n" +
" log_ts TIMESTAMP(3),\n" +
" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector'='kafka',\n" +
" 'topic'='t_kafka_03',\n" +
" 'scan.startup.mode'='latest-offset',\n" +
" 'properties.bootstrap.servers'='xx:9092',\n" +
" 'properties.group.id' = 'testGroup_01',\n" +
" 'format'='json'\n" +
")");
System.out.println("---> 3. insert into iceberg table from kafka stream table .... ");
tableEnv.executeSql(
"INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");
env.execute();
}
}
⑤reading iceberg
package com.huohua.flink2iceberg;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.source.FlinkSource;
public class ReadIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://xxx:8020/user/hive/warehouse/test/iceberg_test");
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
}
}
目前有两种方式可以实现:
调用iceberg的Java API来实现DDL更新,并集成到自己的数据平台上。
用本地的spark-sql做为iceberg表的客户端,然后通过spark-sql来修改DDL。
flink写入iceberg的相关文章:
flink写入iceberg。
从hive读取iceberg数据。
flink实时写入iceberg。
四、hive查询iceberg表
- 将 iceberg-hive-metastore-0.11.1.jar iceberg-hive-runtime-0.11.1.jar copy至lib目录下
- 创建映射表
CREATE EXTERNAL TABLE test.iceberg_hive_1(
`id` int,
`data` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://xxx:8020/user/hive/warehouse/test/iceberg_test'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
-- 这个location 一定要精确到iceberg的表级别
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://xxx:8020/user/hive/warehouse/t
est/iceberg_test'
);
3.执行查询
select * from test.iceberg_test;
insert . update 操作暂不支持
五、spark集成iceberg表
需求描述:spark3.0 读取hive离线数仓的历史数据,insert into/overwrite 写入iceberg中。
使用spark-sql和spark-shell命令行操作
1、将 iceberg-spark3-0.11.1.jar iceberg-spark3-extensions-0.11.1.jar iceberg-spark3-runtime-0.11.1.jar 拷贝
到 jars 目录下
2、将core-site.xml hdfs-site.xml hive-site.xml 拷贝到 conf目录下面\
3、配置spark-defaults.conf
# 设置hive元数据地址
spark.hadoop.hive.metastore.uris thrift://localhost:9083
spark.hadoop.hive.metastore.warehouse.dir hdfs://xxx:8020/user/hive/warehouse
#设置默认的catalog 这里默认catalog为hadoop类型的 hadoop_catalog
spark.sql.catalog.hadoop_catalog = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_catalog.type = hadoop
spark.sql.catalog.hadoop_catalog.warehouse = hdfs://xxx:8020/user/hive/warehouse/
spark.sql.warehouse.dir = hdfs://xxx:8020/user/hive/warehouse/
4、hive创建表
-- 建hive数据库
create database hive_test;
-- 建hive表
create table hive_test.iceberg_test(id bigint,name string);
-- 插入数据
insert into table hive_test.iceberg_test values(1,"zhangsan"),(2,"lisi"),(3,"wangwu");
-- 查询数据
select * from hive_test.iceberg_test;
OK
1 zhangsan
2 lisi
3 wangwu
Time taken: 0.074 seconds, Fetched: 3 row(s)
5.登录shell客户端
bin./spark-shell
-- 创建iceberg表
spark.sql("create table test.iceberg_test (id bigint,name string)").show()
-- 查询hive hive_test.iceberg_test 数据,插入iceberg test.iceberg_test 表中
spark.sql("insert into table test.iceberg_test select * from hive_test.iceberg_test").show()
spark.sql("insert overwrite table test.iceberg_test select * from hive_test.iceberg_test").show()
-- 查询iceberg test.iceberg_test表,已存在数据
scala> spark.sql("select * from test.iceberg_test").show()
+---+--------+
| id| name|
+---+--------+
| 1|zhangsan|
| 2| lisi|
| 3| wangwu|
+---+--------+
6.其他操作iceberg命令
– 获取表结构信息
spark.table("hadoop_catalog.test.spark_test_table").printSchema()
– 读取指定的表数据,这个表是已经存在的
spark.read.format("iceberg").table("hadoop_catalog.test.spark_test_table").show()
spark.sql("select * from hadoop_catalog.test.spark_test_table").show()
– 读取指定快照下的数据
spark.read.option("snapshot-id", 3372567346381641315l).format("iceberg").load("/doit/iceberg/warehouse/default/tb_test1").show
未完待续!!!