Delta Lake-深入理解表结构和结构演变
概述
数据,就像我们的经验一样,总是在不断发展和积累。为了更上步伐,我们的表结构必须适应新的数据,其中包括一些新的维度-一种新的方式来查看我们以前没有概念的事物。这些心理模型与表的结构没有什么不同,它们定义了我们如何对新信息进行分类和处理。
这将我们带到表结构管理。随着业务问题和需求随着时间的推移发展,数据的结构也随之变化。使用Delta Lake,随着数据的变化,合并新维度变得容易。用户可以访问简单的语义来控制其表的结构。这些工具包括表结构强化(可防止用户因错误或垃圾数据而无意中污染表数据)以及表结构演进(可用于使它们在属于列的情况下自动添加丰富数据的新列)。在此文中,我们将深入探讨这些工具的使用。
理解表结构(Understanding Table Schemas)
Apache Spark™中的每个DataFrame都包含一个结构,一个定义数据格式的蓝图,例如数据类型和列,以及元数据。使用Delta Lake,表的结构以JSON格式保存在事务日志中。
什么是表结构强制(Schema Enforcement)
表结构强制(也称为表结构验证)是Delta Lake中的一种安全措施,它通过拒绝对与表的模式不匹配的表的写入来确保数据质量。就像忙碌的餐厅的前台经理只接受预订一样,它会检查插入表中的数据中的每一列是否在其预期列的列表中(换句话说,每一列是否都有“预订”),以及拒绝所有不在列表中的列的写操作。
表结构强制如何工作?
Delta Lake在写入时对表结构进行验证,这意味着在写入时会检查目标表结构的兼容性。如果表结构不兼容,则Delta Lake将取消事务(不写入任何数据),并抛出异常以使用户知道为什么不匹配。
为了确定对表的写入是否兼容,Delta Lake使用以下规则。要写入的DataFrame:
- 列必须一致
不能包含目标表的结构中不存在的任何列。相反,如果输入的数据不包含表中的某一列,则可以,这些列将被简单地设置为空值。
- 数据类型必须一致
列数据类型必须与目标表中的列数据类型相同。如果目标表的列包含StringType数据,但DataFrame中的对应列包含IntegerType的数据,则“表结构强制”将引发异常并阻止写操作的进行。
- 列名区分大小写
不能包含仅大小写不同的列名。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。尽管Spark可用于区分大小写或不区分大小写(默认)模式,但Delta Lake保留大小写,但在存储架构时不区分大小写。存储和返回列信息时,Parquet区分大小写。为了避免潜在的错误,数据损坏或丢失问题(我们在Databricks亲身经历),我们决定添加此限制。
为了说明这一点,请看下面的代码,当试图将一些新计算的列追加到尚未设置为接受它们的Delta Lake表中时,会发生什么情况。
# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
CAST(rand(10) * 10000 * count AS double) AS amount
FROM loan_by_state_delta
""")
# Show original DataFrame's schema
# 查看原始的dataframe的结构
original_loans.printSchema()
"""
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
"""
# Show new DataFrame's schema
# 查看新的dataframe的结构
loans.printSchema()
"""
root
|-- addr_state: string (nullable = true)
|-- count: integer (nullable = true)
|-- amount: double (nullable = true) # new column
"""
# Attempt to append new DataFrame (with new column) to existing table
# 尝试把心的dataframe(带有新列)添加到存在的表中
loans.write.format("delta") \
.mode("append") \
.save(DELTALAKE_PATH)
# 此时将会返回一个结构不匹配的错误
""" Returns:
A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.
"""
Delta Lake不会自动添加新列,而是强制检查表结构并阻止写入。为了帮助确定导致不匹配的列,Spark在堆栈跟踪中打印出了两种表结构以便进行比较。
表结构强制有何用处?
由于执行了如此严格的检查,因此“表结构强制”是一个非常好的工具,可以用来准备用于生产或干净的,完全转换的数据集的保证。通常在直接提供的表数据上执行:
- 机器学习算法
- BI仪表盘展示
- 数据分析和可视化工具
- 任何需要高度结构化,强类型语义模式的生产系统
为了准备高质量的数据,许多用户采用了一种简单的“多跳”架构,该架构逐渐将表结构添加到表中。要了解更多信息,请参阅标题为《 Delta Lake的生产化机器学习》的文章。
当然,可以在管道的任何位置使用“表结构强制”,但是要注意,有时候写入数据流时可能失败,例如,您忘记了在输入的数据中添加了单个列。
防止数据稀释(Preventing Data Dilution)
此时,您会问自己,可能出现什么问题?毕竟,有时会出现意外的“表结构不匹配”的错误,这可能会使您的工作流程陷入瘫痪,特别是如果您不熟悉Delta Lake。为什么不随便更改表结构,但无论如何都要进行更改,以便无论如何我都可以编写DataFrame?
俗话说,“一分预防胜过一磅治疗。”在某些时候,如果您不“强制表结构”,则数据类型兼容性问题将会是头疼的问题—看起来原始数据的同质来源可能包含边缘情况,损坏的列,变形的映射或其他在夜间突然发生的可怕事件。更好的方法是使用表结构强制措施来防止这些问题的发生,而不是等到它们出现了再去解决它们,因为它们会潜伏在您的生产代码的阴影中。
表结构强制使您可以放心,除非您做出确定的选择来更改表的结构,否则表的模式不会更改。它可以防止数据“稀释”,这种情况在频繁添加新列时可能发生,以至于以前丰富而简洁的表由于数据泛滥而失去了其含义和可用性。通过有意识,设置高标准,表结构强制完全按照其设计意图进行工作–-保持诚实,保持表的清洁。
如果经过进一步审查后,您确定确实要添加新列,则此操作很简单,只需一行,如下所述。解决方案是表结构演进!
什么是表结构演进?
表结构演进是一项功能,允许用户轻松更改表的当前结构以适应随时间变化的数据。最常见的是,在执行追加或覆盖操作时使用它来自动调整架构以包括一个或多个新列。
表结构演进如何工作?
遵循上一节中的示例,开发人员可以轻松地使用表结构演进来添加先前由于表结构不匹配而被拒绝的新列。通过在.write或.writeStream Spark命令中添加.option(‘mergeSchema’,‘true’)来**表结构演进。
# Add the mergeSchema option
loans.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save(DELTALAKE_SILVER_PATH)
要查看该图,请执行以下Spark SQL语句。
# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10
另外,您可以通过在Spark配置中添加spark.databricks.delta.schema.autoMerge = True,为整个Spark会话设置此选项。请谨慎使用,因为“表结构强制”将不再警告您意外的表结构不匹配。
通过在查询中包含mergeSchema选项,DataFrame中存在但目标表中不存在的所有列将作为写事务的一部分自动添加到表结构的末尾。还可以添加嵌套字段,并且这些字段也将添加到其各自的struct列的末尾。
数据工程师和科学家可以使用此选项在其现有的机器学习生产表中添加新列(也许是新跟踪的指标,或本月销售数字的列),而不会破坏依赖旧列的现有模型。
在表追加或覆盖期间,以下类型的架构更改可用于架构演变:
- 添加新列(这是最常见的情况)
- 从NullType->任何其他类型更改数据类型,或从ByteType-> ShortType-> IntegerType更改数据
其他不适合表结构演进的更改要求通过添加.option(“ mergeSchema”,“ true”)覆盖架构和数据。例如,如果“ Foo”列最初是整数数据类型,而新模式将是字符串数据类型,则所有Parquet(数据)文件都需要重写。这些更改包括:
- 删除列
- 更改现有列的数据类型(就地)
- 重命名仅因大小写而异的列名(例如“ Foo”和“ foo”)
最后,在即将发布的Spark 3.0中,将完全支持显式DDL(使用ALTER TABLE),从而允许用户对表架构执行以下操作:
- 添加列
- 更改列注释
- 设置定义表行为的表属性,例如设置事务日志的保留期限
模式演化有何用处?
表结构演进可以在您打算更改表结构的任何时候使用(与您不小心将不应该存在的列添加到DataFrame的操作相反)。这是迁移表结构的最简单方法,因为它会自动添加正确的列名和数据类型,而无需显式声明它们。
总结
表结构强制会拒绝所有与表不兼容的新列或其他表结构的改变。通过制定和遵守这些高标准,分析人员和工程师可以相信他们的数据具有最高的完整性,并且可以清晰地进行推理,从而使他们能够做出更好的业务决策。
另一方面,表结构演进使自动进行预期的表结构更改变得容易,从而补充了表结构强制。毕竟,添加一列并不难。
表结构强制是表结构演进的核心。当一起使用时,这些功能比以往更容易防止质量不好的数据。
参考原文
- https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
上一篇: 关于webpack一些注意事项