数据湖之iceberg系列(六)-flink处理数据
1 集群搭建
安装flink集群 ,启动flink集群!
bin/start-cluster.sh
将flink-run...jar上传到flink的lib目录下
启动flink-sql
bin/sql-client.sh embedded -j ./lib/iceberg-flink-runtime-0.10.0.jar shell
2 快速入门
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://linux01:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://linux01:8020//user/hive/warehouse/flink'
);
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://linux01:8020/warehouse/flink',
'property-version'='1'
);
切换catalog ;
Use catalog hadoop_catalog;
创建数据库
CREATE DATABASE iceberg_db;
USE iceberg_db;
建表
create table t2(id bigint , name string) partitioned by (name) ;
插入数据
Insert into t2 values(1,’hangge’) ;
设置参数
-- Execute the flink job in streaming mode for current session context SET execution.type = streaming
-- Execute the flink job in batch mode for current session context SET execution.type = batch
查询数据
Select * from t2 ;
3 基本操作
3.1 flink-sql操作
建库
CREATE DATABASE ..
建表
CREATE TABLE hive_catalog.default.sample (
id BIGINT COMMENT 'unique id',
data STRING
);
Table create commands support the most commonly used flink create clauses now, including:
PARTITION BY (column1, column2, ...) to configure partitioning, apache flink does not yet support hidden partitioning.
COMMENT 'table document' to set a table description.
WITH ('key'='value', ...) to set table configuration which will be stored in apache iceberg table properties.
Currently, it does not support computed column, primary key and watermark definition etc.
插入数据
Insert into t values
INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
查询数据
SET execution.type = batch ;
SELECT * FROM sample ;
修改表
ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
重命名
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
删除表
DROP TABLE hive_catalog.default.sample;
3.2 DataStream 操作
Read with DataStream (不支持)
Write with DataStream
追加写
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");
覆盖写
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");
上一篇: day4_关联关系