欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

数据湖之iceberg系列(六)-flink处理数据

程序员文章站 2022-03-08 11:53:56
...

1 集群搭建

安装flink集群 ,启动flink集群!

bin/start-cluster.sh

数据湖之iceberg系列(六)-flink处理数据

将flink-run...jar上传到flink的lib目录下

数据湖之iceberg系列(六)-flink处理数据

启动flink-sql

 bin/sql-client.sh   embedded  -j   ./lib/iceberg-flink-runtime-0.10.0.jar   shell

数据湖之iceberg系列(六)-flink处理数据

2 快速入门 

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://linux01:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://linux01:8020//user/hive/warehouse/flink'
);
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://linux01:8020/warehouse/flink',
  'property-version'='1'
);

切换catalog ;

Use catalog hadoop_catalog;

创建数据库

CREATE DATABASE iceberg_db;   

USE iceberg_db;   

建表

create table  t2(id bigint , name string) partitioned by (name) ;

插入数据

Insert into t2  values(1,’hangge’) ;

设置参数

-- Execute the flink job in streaming mode for current session context SET execution.type = streaming

-- Execute the flink job in batch mode for current session context SET execution.type = batch

查询数据

Select  * from t2  ;

数据湖之iceberg系列(六)-flink处理数据

3 基本操作

3.1 flink-sql操作

建库

CREATE  DATABASE ..

建表

CREATE TABLE hive_catalog.default.sample (

    id BIGINT COMMENT 'unique id',

    data STRING

);

Table create commands support the most commonly used flink create clauses now, including:

PARTITION BY (column1, column2, ...) to configure partitioning, apache flink does not yet support hidden partitioning.
COMMENT 'table document' to set a table description.
WITH ('key'='value', ...) to set table configuration which will be stored in apache iceberg table properties.
Currently, it does not support computed column, primary key and watermark definition etc.

插入数据

Insert  into  t  values

INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;

查询数据

SET execution.type = batch ;

SELECT * FROM sample ;

修改表

ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')

重命名

ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;

删除表

DROP TABLE hive_catalog.default.sample;

3.2  DataStream 操作

Read with DataStream (不支持)

Write with DataStream

追加写

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;

Configuration hadoopConf = new Configuration();

TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");

FlinkSink.forRowData(input)

    .tableLoader(tableLoader)

    .hadoopConf(hadoopConf)

    .build();

env.execute("Test Iceberg DataStream");

覆盖写

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;

Configuration hadoopConf = new Configuration();

TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");

FlinkSink.forRowData(input)

    .tableLoader(tableLoader)

    .overwrite(true)

    .hadoopConf(hadoopConf)

    .build();

env.execute("Test Iceberg DataStream");