欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Delta Lake

程序员文章站 2022-07-14 20:39:10
...

Delta Lake 简介

Delta Lake是一个可靠的开源存储层,它提供ACID事务,可伸缩的元数据处理,并支持流/批统一。Delta Lake可以运行在现有数据湖之上,并完全和Apache Spark APIs兼容

Delta Lake 具体提供如下特性:

  • Spark上的ACID事务:可序列化的隔离级别确保Reader永远看不到不一致的数据;

  • 可扩展的元数据处理:利用Spark的分布式处理能力,可以轻松处理数十亿个文件的PB级表的所有元数据;

  • 流/批统一:Delta Lake中的表既是批处理表,又是流的源或接收器。流数据提取,批处理历史回填,交互式查询都可以直接使用;

  • 强模式性:自动处理模式变化,防止在摄取过程中插入不良记录;

  • 时间旅行:数据版本控制支持回滚,完整的历史审核跟踪以及可重复的机器学习实验;

  • Upserts和Deletes:支持合并,更新和删除操作,以启用复杂的用例,例如更改数据捕获,缓慢变化尺寸(SCD)操作,流化Upserts等。

Delta Lake 快速开始

本指南可帮助您快速探索三角洲湖的主要特征。它提供了代码片段,显示了如何从交互式,批处理和流查询中读取和写入Delta表。

设置Apache Spark支持Delta Lake

Delta Lake要求Apache Spark在2.4.2以上。按照以下说明设置Spark使用Delta Lake。你可以通过如下两种方式在本地计算机上运行本文档中的步骤。

  1. 交互式运行:启动带Delta Lake的Spark Shell(Scala或者Python),在Shell中交互式运行代码片段;

  2. 以项目运行:创建一个支持Delta Lake 的 Maven或者SBT(Scala或者Java)项目,拷贝代码片段到源文件中,并运行项目。

启动交互式Shell

要在Spark Shell中交互使用Delta Lake,需要在本地安装Apache Spark,根据是使用Python还是Scala可以分别启动PySpark或SparkShell。

PySpark

如果你需要安装或者更新PySpark,运行如下:

pip install --upgrade pyspark

运行带DeltaLake的PySpark

pyspark --packages io.delta:delta-core_2.11:0.5.0
Spark Scala Shell

通过下载下载最新版本的Apache Spark(2.4.2以上版本),使用pip或者下载并解压归档文件,然后解压目录中运行spark-shell。

运行带Delta Lake包的spark-shell

$SPARK_HOME/bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0

注:如果看到以下错误,请确保Apache Spark和delta-core是为相同的Scala版本(2.11或2.12)构建的。下载页面中的Apache Spark-2.4.3的预构建发行版是使用Scala-2.11构建的:

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated

更多信息查看issue

项目启动

如果要使用Maven*存储库中的Delta Lake二进制文件构建项目,则可以使用以下Maven坐标。

Maven

通过将其作为依赖项添加到POM文件中,将Delta Lake包含在Maven项目中。Delta Lake与Scala 2.11和2.12版本交叉编译;选择与您的项目匹配的版本。如果您正在编写Java项目,则可以使用任何一个版本。

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.11</artifactId>
  <version>0.5.0</version>
</dependency>
SBT

通过将以下行添加到build.sbt文件中,将Delta Lake包括在SBT项目中:

libraryDependencies += "io.delta" %% "delta-core" % "0.5.0"

创建表

将DataFrame按delta格式写出到磁盘上来创建一个Delta表,你可以使用已有的SparkSQL代码,将原来的parquet、csv、json等格式更换成delta即可。

  • Python
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
  • scala
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
  • java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

SparkSession spark = ...   // create SparkSession

Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");

这些选项根据DataFrame推断出schema来创建一个新的Delta表。有关创建新Delta表时可用的全部选项的信息,请参见创建表写数据到表

注:此快速入门将本地路径用于Delta表位置。有关为增量表配置HDFS或云存储的信息,请参阅增量存储。

读取数据

您可以通过指定文件的路径来读取Delta表中的数据,例如"/tmp/delta-table":

  • Python
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
  • scala
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
  • java
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");
df.show();

更新表数据

Delta Lake支持几种使用标准DataFrame API修改表的操作。本示例运行批处理以覆盖表中的数据:

覆盖写

  • Python
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
  • scala
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()
  • java
Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");

如果重新读取表数据,因为数据已经本覆盖,所以只能读取到5-9。

有条件的更新数据

Delta Lake提供了编程API,可以有条件地将数据更新,删除和合并(向上插入)到表中。这里有一些例子。

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

// Upsert (merge) new data
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched
  .update(Map("id" -> col("newData.id")))
  .whenNotMatched
  .insert(Map("id" -> col("newData.id")))
  .execute()

deltaTable.toDF.show()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");

// Update every even value by adding 100 to it
deltaTable.update(
  functions.expr("id % 2 == 0"),
  new HashMap<String, Column>() {{
    put("id", functions.expr("id + 100"));
  }}
);

// Delete every even value
deltaTable.delete(condition = functions.expr("id % 2 == 0"));

// Upsert (merge) new data
Dataset<Row> newData = spark.range(0, 20).toDF();

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched()
  .update(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .execute();

deltaTable.toDF().show();

您应该看到一些现有行已更新,并且已插入新行。

有关这些操作的更多信息,请参见表删除,更新和合并

时间旅行实现读取历史版本的数据

您可以使用称为时间旅行的功能查询Delta表的先前快照。如果要访问覆盖的数据,则可以使用versionAsOf选项查询表的快照,然后覆盖第一组数据。

Python

df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Scala

val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Java

Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");
df.show();

你可以查看覆盖之前的数据。时间旅行是一项非常强大的功能,它利用Delta Lake事务日志的功能来访问表中不再存在的数据。删除版本0选项(或指定版本1)将使您再次看到较新的数据。有关更多信息,请参阅查询表的旧快照(时间旅行)

将Stream写入表

您也可以使用Structured Streaming写入Delta表。即使有其他流或批查询同时针对表运行,Delta Lake事务日志也可以保证一次处理。默认情况下,流以追加模式运行,这会将新记录添加到表中:

Python

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Scala

val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Java

import org.apache.spark.sql.streaming.StreamingQuery;

Dataset<Row> streamingDf = spark.readStream().format("rate").load();
StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");

流运行时,您可以使用较早的命令读取表。

注:如果您在shell中运行此命令,则可能会看到流任务的进度,这使得在该shell中键入命令变得困难。在新终端中启动另一个Shell以查询表可能会很有用。

您可以通过stream.stop()在启动流的同一终端上运行来停止流。

有关Delta Lake与结构化流集成的更多信息,请参阅表流读取和写入

从表中读取更改的Stream

在将流写入Delta表时,您还可以从该表中读取流作为源。例如,您可以启动另一个流查询,打印出对Delta表所做的所有更改。

Python

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Scala

val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Java

StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();

批程序读取和写入表

Delta Lake支持Apache Spark DataFrame读写API提供的大多数选项,用于在表上执行批量读写。

创建表

使用DataFrameWriter(Scala、Java或Python)作为原子操作将数据写入Delta Lake。至少必须指定格式delta:

df.write.format("delta").save("/delta/events")

分区数据

您可以对数据进行分区以加快查询或具有涉及分区列的谓词的DML。要在创建增量表时对数据进行分区,请按列指定分区。常见的模式是按日期分区,例如:

scala

df.write.format("delta").partitionBy("date").save("/delta/events")

从表中读取数据

您可以通过指定路径将Delta表作为DataFrame加载:

Scala

spark.read.format("delta").load("/delta/events")

查询表的旧快照(时间旅行)

Delta Lake时间旅行允许您查询Delta表的旧快照。时间旅行有许多用例,包括:

  • 重新创建分析,报告或输出(例如,机器学习模型的输出)。这对于调试或审核尤其有用,特别是在受管制的行业中。

  • 编写复杂的时间查询。

  • 修正数据中的错误。

  • 为快速更改表的一组查询提供快照隔离。

本节介绍了查询表的较旧版本时所支持的方法,数据保留问题并提供了示例。

语法

有几种查询旧版Delta表的方法。

  • DataFrameReader选项

DataFrameReader选项允许您从固定到表的特定版本的Delta表创建DataFrame。

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

对于timestamp_string,仅接受日期或时间戳记字符串。例如"2019-01-01"和"2019-01-01’T’00:00:00.000Z"。

一种常见的模式是在执行Databricks作业期间使用Delta表的最新状态来更新下游应用程序。

写入表

使用数据框追加

使用append模式,您可以将新数据原子添加到现有的Delta表中:

df.write.format("delta").mode("append").save("/delta/events")

使用DataFrames覆盖

要自动替换表中的所有数据,可以使用overwrite模式:

df.write.format("delta").mode("overwrite").save("/delta/events")

您可以有选择地仅覆盖分区列上与谓词匹配的数据。以下命令用中的数据原子替换一月df:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/delta/events")

此示例代码在中写入数据df,验证所有数据均位于指定分区内,并执行原子替换。

注:与Apache Spark中的文件API不同,Delta Lake会记住并强制执行表的Schema。这意味着默认情况下,覆盖不会替换现有表的Schema。

有关Delta Lake支持更新表的信息,请参阅更新表

Schema 验证

Delta Lake自动验证正在写入的DataFrame的Schema与表的Schema兼容。Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容:

  • 所有DataFrame列都必须存在于目标表中。如果表中不存在DataFrame中的列,则会引发异常。表中存在但DataFrame中不存在的列设置为null。

  • DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配,则会引发异常。

  • DataFrame列名称只能大小写不同。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写(默认)模式下使用Spark,但在存储和返回列信息时,Parquet区分大小写。Delta Lake保留大小写,但在存储Schema时不敏感,并且具有此限制以避免潜在的错误,数据损坏或丢失问题。

如果您指定其他选项(例如partitionBy与附加模式结合使用),则Delta Lake会验证它们是否匹配,并为任何不匹配项引发错误。如果partitionBy不存在,则追加将自动跟随现有数据的分区。

自动更新 schema

Delta Lake可以作为DML事务的一部分(附加或覆盖)自动更新表的Schema,并使该schema与正在写入的数据兼容。

添加列

在以下情况下,DataFrame中存在但表中缺失的列将作为写事务的一部分自动添加:

  • write或writeStream有.option(“mergeSchema”, “true”)

添加的列将追加到它们所在的结构的末尾。追加新列时将保留大小写。

NullType列

由于Parquet不支持NullType,NullType因此在写入Delta表时会将列从DataFrame中删除,但仍存储在Schema中。当为该列接收到不同的数据类型时,Delta Lake会将Schema合并到新的数据类型。如果Delta Lake收到NullType现有列的,则在写入过程中将保留旧模式,并删除新列。

NullType不支持流式传输。由于必须在使用流式传输时设置模式,因此这种情况很少见。NullType也不适用于诸如ArrayType和的复杂类型MapType。

替代表 Schema

默认情况下,覆盖表中的数据不会覆盖schema。当覆盖使用表mode(“overwrite”)没有replaceWhere,你可能仍然要覆盖写入的数据的schema。通过将overwriteSchema选项设置为true,可以替换表的schema和分区:

df.write.option("overwriteSchema", "true")

表的视图

Delta Lake支持在Delta表之上创建视图,就像使用数据源表一样。

使用视图进行操作时的核心挑战是解决模式。如果更改Delta表Schema,则必须重新创建派生视图以说明对该Schema的任何添加。例如,如果将新列添加到Delta表中,则必须确保该列在该基表顶部构建的适当视图中可用。

流程序读取和写入表

Delta Lake与Spark Structured Streaming 通过readStream和writeStream深度集成。Delta Lake克服了常见的与流系统和文件相关的许多限制,包括:

  • 保持多个流(或并发批处理作业)的“仅一次”处理

  • 使用文件作为流的源时有效地发现哪些文件是新文件

Delta表作为Stream源

当您将Delta表加载为流源并在流查询中使用它时,该查询将处理表中存在的所有数据以及流启动后到达的所有新数据。

spark.readStream.format("delta").load("/delta/events")

您还可以通过设置maxFilesPerTrigger选项来控制Delta Lake提供给流的任何微批处理的最大大小。这指定了每个触发器中关联的新文件最大数量。默认值为1000。

忽略更新和错误

Structured Streaming 只处理追加的输入,并且如果对用作源的表进行了任何修改,则抛出异常。有两种主要的处理无法自动向下游传播的更改的策略:

  • 由于默认情况下Delta表保留所有历史记录,因此在许多情况下,您可以删除输出和检查点并从头开始重新启动流。

  • 您可以设置以下两个选项之一:

    • ignoreDeletes忽略在分区边界删除数据的事务。例如,如果您的源表按日期进行了分区,并且删除了30天之前的数据,则该删除将不会传播到下游,但是流可以继续运行。

    • ignoreChanges 如果文件在被诸如更新、合并写入、删除分区或者覆盖时会被重新写入到源文件,从而导致重新更新。不变的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges包括ignoreDeletes,因此,如果您使用ignoreChanges,则流将不会被源表的删除或更新中断。

例子

假如你有一个表user_events,字段包括date、user_email和action,按date进行了分区。您从user_events表中提取数据,但是由于GDPR的关系,您需要从中删除一些数据。

events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/delta/user_events")

然而,如果你必须基于user_email删除数据,那么你需要使用:

events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/delta/user_events")

如果你使用UPDATE语句更新user_email,则包含user_email的文件都会被重写。如果使用了ignoreChanges,更新的记录和文件中未被修改的记录会被一并传播到下游。你的逻辑应该要能够处理这些传入重复的记录。

将Delta表作为一个Sink源

您也可以使用结构化流将数据写入Delta表。事务日志使Delta Lake能够保证一次处理,即使针对该表同时运行其他流或批查询。

追加模式

默认情况下,流以追加模式运行,这会将新记录添加到表中。

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events") // as a path

完全模式

您还可以使用Structured Streaming在每个批次替换整个表。示例使用聚合来计算摘要:

spark.readStream
  .format("delta")
  .load("/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/delta/eventsByCustomer")

前面的示例不断更新包含客户事件总数的表。

对于延迟要求更宽松的应用程序,您可以使用一次性触发器来节省计算资源。使用这些更新按给定的时间表更新汇总聚合表,仅处理自上次更新以来已到达的新数据。