欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Delta Lake - 增删改事务操作之大结局

程序员文章站 2022-07-14 20:38:28
...

在《Delta Lake - 数据写入的旅程》和《Delta Lake - 数据更新的旅程》文章中,我们已经从源码层面掌握了 Delta Lake 数据写入和数据更新的实现过程,并结合案例进行实战,相信读者应该有比较深入的理解。

针对不再使用或有异常的数据,我们需要进行删除操作。那么 Delta Lake 数据删除是如何实现的呢?

笔者将在本章基于源码研究 Delta Lake 数据删除的始末。

本篇文章为 Delta 增删改的最后一部分内容,学完后,读者将真正入门 Delta Lake。

数据删除示例

笔者使用《Delta Lake - 数据更新的旅程》文章中的数据更新的结果进行删除操作。

以 Scala 编程语言实现,首先看一下之前的 Delta 数据:

  1. scala> import io.delta.tables._

  2. scala> val deltaTable = DeltaTable.forPath(spark,"/spark/datasets/delta/")

  3. scala> deltaTable.toDF.show()

  4. +---+-------+

  5. |age| name|

  6. +---+-------+

  7. | 40|Michael|

  8. | 30| Andy|

  9. | 19| Justin|

  10. +---+-------+

执行 Delta 删除操作:

  1. scala> deltaTable.delete("name = 'Justin'")

  2. scala> deltaTable.toDF.show()

  3. +---+-------+

  4. |age| name|

  5. +---+-------+

  6. | 40|Michael|

  7. | 30| Andy|

  8. +---+-------+

Delta 删除操作历史记录

我们可以通过 Delta Lake 的 HISTORY 命令查看操作历史记录(按时间倒序返回): 

Delta Lake - 增删改事务操作之大结局

  1. scala> deltaTable.history().show()

  2. +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

  3. |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|

  4. +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

  5. | 3|2019-12-02 22:30:...| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| null| false|

  6. | 2|2019-11-21 16:38:...| null| null| UPDATE|[predicate -> (na...|null| null| null| 1| null| false|

  7. | 1|2019-11-21 16:29:...| null| null| UPDATE|[predicate -> (na...|null| null| null| 0| null| false|

  8. | 0|2019-11-21 16:18:...| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true|

  9. +-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

HISTORY 命令返回的结果中,version 为 3 的记录,即为本次 Delta 删除的操作记录。

查看删除操作的详细记录:

  1. scala> deltaTable.history(1).show(false)

  2. +-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+

  3. |version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|

  4. +-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+

  5. |3 |2019-12-02 22:30:49.969|null |null |DELETE |[predicate -> ["(`name` = 'Justin')"]]|null|null |null |2 |null |false |

  6. +-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+

Delta 事务日志分析

上面 Delta 删除操作成功后,则会生成一个事务日志,如下:

  1. $ hdfs dfs -ls /spark/datasets/delta/_delta_log/00000000000000000003.json

  2. {"commitInfo":{"timestamp":1575297049841,"operation":"DELETE","operationParameters":{"predicate":"[\"(`name` = 'Justin')\"]"},"readVersion":2,"isBlindAppend":false}}

  3. {"remove":{"path":"part-00000-9c1da674-7c4d-4061-ba3c-0ae3926bd593-c000.snappy.parquet","deletionTimestamp":1575297049819,"dataChange":true}}

  4. {"add":{"path":"part-00000-240bcdd5-b087-4696-bdc4-f0ce64dcc7ae-c000.snappy.parquet","partitionValues":{},"size":641,"modificationTime":1575297049785,"dataChange":true}}

事务日志的具体含义,之前都详细讲解过,这里不再重复说明。

有几点需要补充一下:

  • 1. Delta Lake Delete 操作在最新版本中支持 Scala、Java、Python API,不支持 SQL,而在 Databricks Runtime 商业版本中才支持 SQL。

  • 2. Delta Lake Delete 操作成功后,其底层存储的数据并没有被删除,而是在事务日志里面标记删除状态。执行 vacuum 命令后,数据才真正被删除。

接下来,我们进入正题,基于源码去深入理解 Delta Lake Delete 操作。

数据删除的旅程

通过前面的更新操作,我们知道 Delete API 也在 io.delta.tables.DeltaTable 中实现的,涉及删除的方法有三个:

  1. // Delete data from the table that match the given `condition`.

  2. def delete(condition: String): Unit = {

  3. delete(functions.expr(condition))

  4. }

  5. // Delete data from the table that match the given `condition`.

  6. def delete(condition: Column): Unit = {

  7. executeDelete(Some(condition.expr))

  8. }

  9. // Delete data from the table.

  10. def delete(): Unit = {

  11. executeDelete(None)

  12. }

上面定义的三个函数最终都是调用 executeDelete 函数,该函数定义在 io.delta.tables.execution.DeltaTableOperations,提供 DeltaTable 操作的实际实现的接口。

executeDelete 函数实现内容如下:

  1. // Catalyst 中的表达式

  2. protected def executeDelete(condition: Option[Expression]): Unit = {

  3. // Delete 为 case class Delete(child: LogicalPlan, condition: Option[Expression])

  4. // child 为 Delta Lake 表的 Analyzed Logical Plan

  5. // condition 为执行删除操作的条件表达式

  6. val delete = Delete(self.toDF.queryExecution.analyzed, condition)

  7. // 当然版本的 DELETE 不支持子查询

  8. subqueryNotSupportedCheck(condition, "DELETE")

  9. // 生成执行计划

  10. val qe = sparkSession.sessionState.executePlan(delete)

  11. val resolvedDelete = qe.analyzed.asInstanceOf[Delete]

  12. // 下面重点分析 DeleteCommand

  13. val deleteCommand = DeleteCommand(resolvedDelete)

  14. deleteCommand.run(sparkSession)

  15. }

接下来,笔者重点分析如下操作:

  1. val deleteCommand = DeleteCommand(resolvedDelete)

  2. deleteCommand.run(sparkSession)

DeleteCommand case class 中定义了伴生对象 DeleteCommand,里面定义了 apply 方法,如下:

  1. object DeleteCommand {

  2. def apply(delete: Delete): DeleteCommand = {

  3. val index = EliminateSubqueryAliases(delete.child) match {

  4. case DeltaFullTable(tahoeFileIndex) =>

  5. tahoeFileIndex

  6. case o =>

  7. throw DeltaErrors.notADeltaSourceException("DELETE", Some(o))

  8. }

  9. DeleteCommand(index, delete.child, delete.condition)

  10. }

  11. val FILE_NAME_COLUMN = "_input_file_name_"

  12. }

DeleteCommand(resolvedDelete) 调用了 apply 方法,初始化 DeleteCommand,DeleteCommand 定义为:

  1. case class DeleteCommand(

  2. tahoeFileIndex: TahoeFileIndex,

  3. target: LogicalPlan,

  4. condition: Option[Expression])

  5. extends RunnableCommand with DeltaCommand

可知,DeleteCommand 类扩展了 Spark 的 RunnableCommand trait,看一下 RunnableCommand :

  1. trait RunnableCommand extends Command {

  2. // The map used to record the metrics of running the command. This will be passed to

  3. // `ExecutedCommand` during query planning.

  4. lazy val metrics: Map[String, SQLMetric] = Map.empty

  5. def run(sparkSession: SparkSession): Seq[Row]

  6. }

根据 RunnableCommand trait,DeleteCommand 需要实现 run 方法,前面我们学习过的 update 和 merge 也是继承这个类。

DeleteCommand 的 run 方法实现如下:

  1. final override def run(sparkSession: SparkSession): Seq[Row] = {

  2. // 用于记录持续时间以及操作的成功或失败

  3. recordDeltaOperation(tahoeFileIndex.deltaLog, "delta.dml.update") {

  4. // 获取事务日志对象

  5. val deltaLog = tahoeFileIndex.deltaLog

  6. // 检查 Delta 表是否支持删除操作

  7. // 因为 Delta Lake 表允许用户设置成 appendOnly,即只能追加,所以需要检查

  8. deltaLog.assertRemovable()

  9. // 开启新事务,执行删除操作,保证原子性

  10. deltaLog.withNewTransaction { txn =>

  11. performUpdate(sparkSession, deltaLog, txn)

  12. }

  13. // Re-cache all cached plans(including this relation itself, if it's cached) that refer to

  14. // this data source relation.

  15. sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)

  16. }

  17. Seq.empty[Row]

  18. }

这里我们再回顾一下 withNewTransaction 事务的实现:

  1. def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {

  2. try {

  3. // 通过应用新的增量文件(如果有)来更新ActionLog

  4. // 更新当前表事务日志的快照

  5. update()

  6. // 实例化乐观事务锁对象

  7. val txn = new OptimisticTransaction(this)

  8. // 开启乐观事务锁

  9. OptimisticTransaction.setActive(txn)

  10. // performDelete(sparkSession, deltaLog, txn) 操作

  11. // Delta Delete 的核心操作方法

  12. thunk(txn)

  13. } finally {

  14. // 关闭事务

  15. OptimisticTransaction.clearActive()

  16. }

  17. }

Delta Lake 删除的核心代码定义在 performDelete 方法中,下面我们具体分析源码,并附上注释,方便读者查看:

  1. private def performDelete(

  2. sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = {

  3. import sparkSession.implicits._

  4. // 统计信息

  5. var numTouchedFiles: Long = 0

  6. var numRewrittenFiles: Long = 0

  7. var scanTimeMs: Long = 0

  8. var rewriteTimeMs: Long = 0

  9. // 开始时间

  10. val startTime = System.nanoTime()

  11. val numFilesTotal = deltaLog.snapshot.numOfFiles

  12. val deleteActions: Seq[Action] = condition match {

  13. // Delta delete 分为几种情况,下面依次进行解释

  14. // 如果执行 delete 时,没有传递任何删除条件,则删除当前 Delta 表的所有数据,对应 Case 1 处理方式

  15. case None =>

  16. // Case 1: 如果 condition 为 true,直接删除 Delta 表对应的所有文件即可

  17. // 获取内存中快照里面所有的 AddFile 文件

  18. val allFiles = txn.filterFiles(Nil)

  19. // 文件数量

  20. numTouchedFiles = allFiles.size

  21. scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

  22. val operationTimestamp = System.currentTimeMillis()

  23. // 将 AddFile 标记成 RemoveFile,用于标记删除

  24. allFiles.map(_.removeWithTimestamp(operationTimestamp))

  25. // 如果删除时传递了条件,则对应 Case 2 和 Case 3 处理方式

  26. case Some(cond) =>

  27. // 针对表有分区的分区条件和其他删除条件处理

  28. val (metadataPredicates, otherPredicates) =

  29. DeltaTableUtils.splitMetadataAndDataPredicates(

  30. cond, txn.metadata.partitionColumns, sparkSession)

  31. // 其他删除条件为空,只有分区删除条件,即使用分区字段

  32. if (otherPredicates.isEmpty) {

  33. // Case 2: 从缓存在内存中的 snapshot 中获取需要删除的文件,然后直接删除,不需要执行数据重写操作

  34. val operationTimestamp = System.currentTimeMillis()

  35. // 从快照中拿出符合这个分区删除条件的 AddFile 文件

  36. val candidateFiles = txn.filterFiles(metadataPredicates)

  37. scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

  38. numTouchedFiles = candidateFiles.size

  39. // 对于上面选出符合条件的文件,标记为 RemoveFile

  40. candidateFiles.map(_.removeWithTimestamp(operationTimestamp))

  41. } else {

  42. // 这是最后一种情况 Case 3,稍微复杂点

  43. // Case 3: 用户删除 Delta 表时,删除条件含有一些非分区字段的过滤条件

  44. // 这种情况还要分为 Case 3.1 和 Case 3.2 两种情况,下面会一一说明

  45. // 根据分区条件和其他条件,找到删除的数据潜在的文件列表

  46. val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)

  47. // 记录文件数

  48. numTouchedFiles = candidateFiles.size

  49. // 生成文件名 map,以添加用于执行需要重写文件(例如删除、合并和更新)的操作的文件条目。map 中存储的文件名都是唯一的,因为每个文件都包含一个 UUID

  50. val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles)

  51. // 从 DeltaLog version 范围内给定文件列表中生成文件列表

  52. val fileIndex = new TahoeBatchFileIndex(

  53. sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)

  54. // target 为替换文件 index 的 logical plan(LogicalPlan)

  55. // fileIndex 为新的文件 index(FileIndex)

  56. // 功能是将文件 index 替换为 logical plan,然后返回更新后的 plan

  57. val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)

  58. // 潜在被删除的文件对应的 Dataset

  59. val data = Dataset.ofRows(sparkSession, newTarget)

  60. val filesToRewrite =

  61. withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") {

  62. // 没有潜在被删除的文件

  63. if (numTouchedFiles == 0) {

  64. Array.empty[String]

  65. } else {

  66. // 过滤出需要被删除数据所在的文件

  67. // 返回一个包含此数据集中所有行的 Array

  68. data.filter(new Column(cond)).select(new Column(InputFileName())).distinct()

  69. .as[String].collect()

  70. }

  71. }

  72. scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000

  73. if (filesToRewrite.isEmpty) {

  74. // Case 3.1: Delta 表没有找到需要被删除数据的文件,则不需要做任何操作,直接返回 Nil,另外也不需要记录事务日志

  75. Nil

  76. } else {

  77. // Case 3.2: some files need an update to remove the deleted files

  78. // Do the second pass and just read the affected files

  79. // Case 3.2: 获取需要删除的文件列表,并将需要删除文件里面不用删除的数据重新写到新文件中

  80. val baseRelation = buildBaseRelation(

  81. sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap)

  82. // Keep everything from the resolved target except a new TahoeFileIndex

  83. // that only involves the affected files instead of all files.

  84. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)

  85. // 潜在被删除的文件对应的 Dataset

  86. val targetDF = Dataset.ofRows(sparkSession, newTarget)

  87. // 将删除过滤条件取反,获取保留的数据,后面会将不用删除的数据写入新文件

  88. val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType)))

  89. // 过滤不要删除的 Dataset

  90. val updatedDF = targetDF.filter(new Column(filterCond))

  91. // rewrittenFiles 为 新增的 AddFile 列表

  92. val rewrittenFiles = withStatusCode(

  93. "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {

  94. // 事务操作,将不需要删除的数据写入到新文件

  95. txn.writeFiles(updatedDF)

  96. }

  97. numRewrittenFiles = rewrittenFiles.size

  98. rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs

  99. val operationTimestamp = System.currentTimeMillis()

  100. // 需要被删除的文件和重新写数据的新增文件的集合

  101. removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++

  102. rewrittenFiles

  103. }

  104. }

  105. }

  106. // 如果匹配到需要删除的文件

  107. if (deleteActions.nonEmpty) {

  108. // 写事务日志,即在 _delta_log 目录下写一个新文件,记录事务操作信息

  109. txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))

  110. }

  111. // 记录 Delta Delete 操作的事件信息

  112. recordDeltaEvent(

  113. deltaLog,

  114. "delta.dml.delete.stats",

  115. data = DeleteMetric(

  116. condition = condition.map(_.sql).getOrElse("true"),

  117. numFilesTotal,

  118. numTouchedFiles,

  119. numRewrittenFiles,

  120. scanTimeMs,

  121. rewriteTimeMs)

  122. )

  123. }

代码量还是比较多的,所以笔者在代码的大部分地方都加了注释,希望读者跟着代码更好地理解。

最后,需要强调的是,官方建议删除数据的时候提供分区过滤条件,这样可以避免扫描全表的数据,除非的确需要删除全表数据。

大结局

到此,笔者从源码层面分析了 Delta Lake 数据删除的整个流程,读者可以根据源码进行再次查看,加深印象。

另外,笔者将 Delta Lake 增删改方面的内容都更新完毕,读者可以结合这个系列的三篇文章,再温习一下,也可以参与 Delta Lake 项目开发,比如支持 SQL 方式增删改操作和增加权限管理等需求特性。