Flink --通过讲mysql数据插入到iceberg
程序员文章站
2022-07-14 18:47:05
...
1. 创建表
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://node01.com:8020/flink/warehouse/',
'property-version'='1'
);
2.创建分区表
-- 创建分区表
create table iceberg_db.testA(
id bigint,
name string,
age int,
dt string
)
PARTITIONED by(dt);
3. 插入数据
insert into iceberg_db.testA values(1005,'spark',18,'2021-09-10'),(1006,'hbase',19,'2021-09-11');
4. 将mysql 数据插入到iceberg 表中
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL> create table mysql_iceberg07(id bigint, name string, age int,dt string)
> with(
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.1.180:3306/test',
> 'username'='root',
> 'password'='123456',
> 'table-name' = 'Flink_iceberg'
> );
[INFO] Table has been created.
Flink SQL> select * from mysql_iceberg07;
[INFO] Result retrieval cancelled.
Flink SQL> select * from hadoop_catalog.iceberg_db.testA;
[INFO] Result retrieval cancelled.
Flink SQL> insert into hadoop_catalog.iceberg_db.testA select * from mysql_iceberg07;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 94da626a9645b4aef7069418a80687f0
Flink SQL> select * from hadoop_catalog.iceberg_db.testA;
Table program finished. Page: Last of 1 Updated: 01:02:05.418
id name age dt
1005 spark 18 2021-09-10
1006 hbase 19 2021-09-11
10011 flink-mysql 19 2021-09-24
1003 wudl 18 2021-09-10
1004 flink 19 2021-09-11