欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Delta Lake 实践(离线篇)

程序员文章站 2022-07-14 20:38:52
...

Delta Lake 是什么?简单的说就是为大数据场景添加了事务功能,并且支持了 update/delete/merge into 等功能, Delta Lake 初探

要将 Delta Lake 与 spark2.4 sql 以及大数据平台打通还是有一些工作需要去做,下文是在该过程中的一些实践及思考。

SQL 支持

DML

背景

delta lake 0.4 只支持以 api 的方式使用 Delete/Update/Merge Into 等 DML,对习惯了使用 sql 的终端用户会增加其学习使用成本。

解决方式

使用 spark sql extension 以插件化的方式扩展 sql parser ,增加 DML 语法的支持。在 spark 推出 sql extension 功能前,为了不修改 spark 的源码,也可以用通过 aspectj 的方式实现增加自定义语法的功能,这两种方式的实现都在之前的文章有过说明。

  1. 增加相应的 DML antlr4 语法

    databrick 版本的 delta lake 其实已经有相关语法了,参考下语法做下本地化修改就好了,我们这边会对用户屏蔽低层 path ,所以就把 path 这类语法都去掉了。

    Delta 相关语法文件

   statement
       : DELETE FROM table=qualifiedName tableAlias
                 (WHERE where=booleanExpression)?                              #deleteFromTable
       | UPDATE table=qualifiedName tableAlias upset=setClause
                 (WHERE where=booleanExpression)?                              #updateTable
       | VACUUM table=qualifiedName
           (RETAIN number HOURS)? (DRY RUN)?                                   #vacuumTable
       | (DESC | DESCRIBE) HISTORY table=qualifiedName
           (LIMIT limit=INTEGER_VALUE)?                                        #describeDeltaHistory
       | MERGE INTO target=qualifiedName targetAlias=tableAlias
               USING (source=qualifiedName |
                 '(' sourceQuery=query')') sourceAlias=tableAlias
               ON mergeCondition=booleanExpression
               matchedClause*
               notMatchedClause*                                               #mergeIntoTable
  1. 实现对应的 visit,将 sql 翻译为 delta api

    以最简单的 delete 为例

     override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
       DeleteTableCommand(
         visitTableIdentifier(ctx.table),
         Option(getText(ctx.where)))
     }
     
     case class DeleteTableCommand(table: TableIdentifier,
                                 where: Option[String]) extends RunnableCommand {
     override def run(sparkSession: SparkSession): Seq[Row] = {
       DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
       val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
       if (where.isEmpty) {
         deltaTable.delete()
       } else {
         deltaTable.delete(where.get)
       }
       Seq.empty[Row]
     }
   }
  1. 启动加入相应的 extension jar ,初始化 SparkSession 时指定 Extension 类。
val spark = SparkSession.builder
    .enableHiveSupport()
    .config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")

tip
spark 3 之前不支持配置多个 extension ,如果遇到使用多个 extension 的情况,可以将多个 extension 在一个 extension 代码中进行注入。

以同时增加 tispark extension 和 自定义 extension 为例

override def apply(extensions: SparkSessionExtensions): Unit = {
		extensions.injectParser(TiParser(getOrCreateTiContext))
    extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
    extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
    extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
    extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}

Query

识别 delta table 有三种实现方式

  1. 使用相应表名前缀/后缀作为标识
  2. 在 table property 中增加相应的参数
  3. 判断表是否存在_delta_log

我们一开始是使用 delta_ 表名的前缀作为 delta 标识,这样实现最为简单,但是如果用户想将 hive(parquet) 表转为 hive(delta) ,要是表名发生变化则需要修改相关代码,所以后面改为在table propertie 中增加相应的参数进行识别。
还或者通过判断是否存在 _delta_log 文件识别,该方式需要在建表时写入带有 schema 信息的空数据。

Query 通过对sql执行进行拦截,判断 Statement 为 SELECT 类型,然后进行数据源适配

if (statementType == SELECT) {
		TableData tableData = (TableData) statementData.getStatement();
  	sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}

Insert

Insert 需要考虑 INSERT_VALUES / INSERT_SELECT 还有分区表/非分区表以及写入方式的一些情况。

sql 类型判断

if (INSERT_SELECT == statementType) {
		isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
		isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}

INSERT_INTO 需要从 catalog 中获取对应的 schema 信息,并将 values 转化为 df

val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
    .map(col => col.name + " " + col.dataType)
    .collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)

INSERT_SELECT 获取 select 信息转化为相应 df 。

partition

由于 delta api 的限制,不支持静态分区,在 table 中解析到对应的动态分区名,使用 partitionBy 写入即可。

至此,已经实现使用 apache spark 2.4 使用 sql 直接操作 delta table 表。

平台化工作

主要是与 hive metastore 的集成 / 表状态收集 / 管理 等本地化的一些工作。

浏览 delta 数据

用户在平台上点击浏览数据,如果通过 delta api ,启动 spark job 的方式依赖过重,延时较高,用户体验差。

基于之前 parquet 格式已经完成的一些工作,其实这部分可以简化为找出 delta 事务日志中还存活 (add - remove) 的 parquet 文件进行读取展示,这样就避免了启动 spark 的过程,大多数情况能做到毫秒级返回数据。

值的注意的是,_delta_log 文件只存在父目录,浏览某个分区的数据同样需要浏览父目录获取相应分区内的存活文件。

// DeltaHelper.load 方法会从 _delta_log 目录中找到存活 parquet 文件,然后使用 ParquetFileReader 读取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
    inputFiles = DeltaHelper.load(dir, conf);
} else {
    inputFiles = getInputFilesFromDirectory(projectCode, dir);
}

元数据兼容

将原生 delta lake 基于 path 的工作方式与 hive metastore 进行兼容。

数据写入/删除

分区表数据插入

​ 使用动态分区插入数据后,统计到相应分区的信息(我们是修改了 spark 相应代码得到的写入分区信息),如果分区不存在则自动增加分区 add partition if ...。还有一种更简单的做法是直接使用 msck repair table ,但是这种方式在分区多的情况下,性能会非常糟糕。

删除分区

​ 在界面上操作对某个分区进行删除时,后台调用 delta 删除api,并更新相关 partition 信息。

统计信息更新

元数据中表/分区记录数,大小等元数据的更新支持。

碎片文件整理

使用 delta lake 之前小文件整理方式可以参考 Spark 小文件合并优化实践,非 delta lake 表的小文件整理使用的是同步模式,可能会影响到下有任务的启动时间。

基于 delta lake 的小文件整理要分为两块,存活数据和标记删除的数据

  1. 标记删除的数据

    被 delta 删除的数据,底层 parquet 文件依旧存在,只是在 delta_log 中做了标记,读取时跳过了该文件。

    可以使用 delta 自带的 vacuum 功能删除一定时间之前标记删除的数据。

  2. 存活数据

    不断写入的小文件可以基于 delta 的特性,可以实现一个 compaction 功能,然后后台不断的做异步合并,不影响数据的使用方。

结语

一些限制

  1. 由于 delta api 的限制,目前 delta delete / update 不支持子句,可以使用 merge into 语法实现相同功能。
  2. 由于 delta api 的限制,只支持动态分区插入。

merge 使用场景

有 a1,a2 两张表,如果 a.1eventId = a2.eventId ,则 a2.data 会覆盖 a1.data,否则将 a2 表中相应的数据插入到 a1 表

MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
  UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)

ETL 避免数据重复场景
避免插入重复的记录
如果 uniqueId 不存在于 a1,则插入 a2 表中的相应记录

MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

维度表更新场景

  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 true ,则删除 a1 表相应记录
  • 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 为 false ,则将 a2 表相应记录的 value 更新到 a1 表中
  • 如果没有匹配到相应合作方,且 a2 中 deleted 为 fasle ,则将 a2 表相应记录插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)

历史数据清理场景
如果 a1 和 a2 表的合作方相同,则删除 a1 表中 ds < 20190101 的所有数据

MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
  DELETE