欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink --通过讲mysql数据插入到iceberg

程序员文章站 2022-07-14 18:47:05
...

1. 创建表

 CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
 'warehouse'='hdfs://node01.com:8020/flink/warehouse/',
 'property-version'='1'
);

2.创建分区表

-- 创建分区表
create table iceberg_db.testA(
id bigint,
name string,
age int,
dt string
)
PARTITIONED by(dt);

3. 插入数据

insert into iceberg_db.testA values(1005,'spark',18,'2021-09-10'),(1006,'hbase',19,'2021-09-11');

4. 将mysql 数据插入到iceberg 表中

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> create table mysql_iceberg07(id bigint, name string, age int,dt string)
> with(
>     'connector' = 'jdbc',
>     'url' = 'jdbc:mysql://192.168.1.180:3306/test',
>     'username'='root',
>     'password'='123456',
>     'table-name' = 'Flink_iceberg'
> );
[INFO] Table has been created.

Flink SQL> select * from mysql_iceberg07;
[INFO] Result retrieval cancelled.

Flink SQL> select * from hadoop_catalog.iceberg_db.testA;
[INFO] Result retrieval cancelled.

Flink SQL> insert into hadoop_catalog.iceberg_db.testA  select * from mysql_iceberg07;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 94da626a9645b4aef7069418a80687f0


Flink SQL> select * from hadoop_catalog.iceberg_db.testA;
Table program finished.                                                                      Page: Last of 1                                                                         Updated: 01:02:05.418 

                        id                      name                       age                        dt
                      1005                     spark                        18                2021-09-10
                      1006                     hbase                        19                2021-09-11
                     10011               flink-mysql                        19                2021-09-24
                      1003                      wudl                        18                2021-09-10
                      1004                     flink                        19                2021-09-11