欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Delta Lake 分区表覆盖写入操作

程序员文章站 2022-07-14 20:46:22
...

Delta Lake当前版本(0.5)只支持API操作的,但是实现 Insert SQL 语法也不难,可以参考 Delta Lake 平台化实践(离线篇),需要注意的是 Delta Lake 中的分区表覆盖写入操作。

INSERT OVERWRITE

INSERT OVERWRITE TABLE 分区表时分三种场景:

  1. 动态分区 - 写入前会删除所有分区,并根据数据中的分区字段写入相应新分区
  2. 静态分区 - 只会对指定的分区进行覆盖写操作
  3. 混合分区(动态+静态分区) - 上述两种情况的结合

如果想通过 SQL 转化为上述 API ,首先需要在 sql parser 的时候获取到 insertMode 和 partitions 信息,并将 partitions 信息存在一个有序的结构中,例如 LinkedHashMap。然后利用这些信息,就可以拼装进行拼装实现上述三种场景。

动态分区

对所有 ds 分区进行覆盖写操作,将会清空所有 ds 分区
sql

INSERT OVERWRITE TABLE db.tableA partition(ds) select name,ds from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite").partitionBy(ds)

静态分区

对 ds=20200101 的分区进行覆盖写操作
sql

INSERT OVERWRITE TABLE db.tableA partition(ds=20200101) select name from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite").option("replaceWhere", "ds = 20200101").partitionBy(ds)

混合分区

对 ds=20200101 中的所有 event 的分区进行覆盖写操作,将会清空所有 event 分区
sql

INSERT OVERWRITE TABLE db.tableA partition(ds=20200101,event) select name,event from db.tableB

Delta Lake API

df.write.format("delta").mode("overwrite").option("replaceWhere", "ds = 20200101").partitionBy(ds,event)

后记

  1. 分区操作,一定要保证 partition 信息的有序

  2. 新表需要从 hive metastore 中获取 partition 信息,Delta Table 在第一次写入数据前,是不会生成 _DELTA_LOG 目录的,此时可以从 hive metastore 中获取建表时的分区名和其对应的类型,例如:

    //ddl: `ds` INT COMMENT 'ds'
    val ddl = spark.sharedState.externalCatalog.getTable(dbName, tableName).partitionSchema.toDDL
    val partitionNameAndType = new mutable.HashMap[String, String]()
    
    ddl.split(",").foreach { r =>
      val x = r.split(" ")
      partitionNameAndType.put(x(0).replace("`", ""), x(1))
    }
    
  3. 语义不同

    Hive Table 直接使用 insert overwrite 动态分区只会覆盖数据涉及到的分区,而 Spark 和 Delta Lake 的 API 则会将所有所有分区进行覆盖。Spark 2.3 以后也可以通过下述API实现 Hive insert overwrite 语义

    spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
    data.write.mode("overwrite").insertInto("partitioned_table")
    
  4. 动态分区覆盖写是高危操作

    该操作很有可能会删除一些你不期望的数据,所以 Delta Lake 目前的 API 提供了 replaceWhere option 进行约束

相关标签: Spark Delta Lake