欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Delta Lake 测试案例

程序员文章站 2022-07-14 20:39:10
...

Delta Lake 测试案例

参考:官网调研传送门

本篇我将写几个测试用例,来测试其功能,我这里使用sbt去创建项目,数据落盘到我本地电脑

一、创建项目:

组件 版本
sbt 1.4.2
scala 2.12.10
Spark 3.0.0
DeltaLake 0.7.0

build.sbt文件

name := "DaltaLake"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "io.delta" %% "delta-core" % "0.7.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"

二、测试案例

1、手动创建数据

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkDataInot {
 def main(args: Array[String]): Unit = {
 val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Test"))

val spark = SparkSession.builder().getOrCreate()
 val rdd = sc.makeRDD(Seq(1, 2, 3, 4))

import spark.implicits._

val df = rdd.toDF("id")

df.write.format("delta").save("/Users/smzdm/Documents/data/0")
// df.write.format("delta").save("DeltaTable") 当然,本地测试的话,这么写也是OK的,存储位置是项目目录下

df.show()
 sc.stop()
 spark.stop()
 }
}
-----------------------------------
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

目录下00000000000000000000.json文件内容:

{"commitInfo":{"timestamp":1611823662796,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputBytes":"2007","numOutputRows":"4"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","partitionValues":{},"size":299,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}

以下每次操作写表会生成一个write的json文件,我这里就不重复写了

{"commitInfo":{"timestamp":1611828204517,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":3,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"add":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828204000,"dataChange":true}}
{"remove":{"path":"part-00000-9374a554-221f-4fc6-8197-486cc61d9dc5-c000.snappy.parquet","deletionTimestamp":1611828204512,"dataChange":true}}

2、测试Delta表更新Schema(其实Spark本身可以实现更新Schema)

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

/**
 * 测试案例:新增schema
 */
object DeltaSchema {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().master("local[*]").getOrCreate()

val df = spark.read.format("delta").load("/Users/smzdm/Documents/data/0")

val df1 = df.withColumn("rn",row_number()over(Window.orderBy("id")))
 .withColumn("flag",when(col("rn")===lit(1),lit(1)).otherwise(lit(0)))

df1.show()

df1.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000001.json文件内容:

{"commitInfo":{"timestamp":1611824104223,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"metaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rn\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flag\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-5f96937a-30aa-4e23-af03-f0679367ca7c-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611824103000,"dataChange":true}}
{"remove":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}

3、更新数据

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import io.delta.tables._

/**
 * 测试案例:更新update
 */
object DeltaUpdate {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("update")
 .master("local[*]").getOrCreate()


import spark.implicits._
 //读取成表
 val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")


// update(where条件,Map(更新字段->(更新逻辑)))
 ddf.update(col("id")>2,Map("id"->(col("id")+10)))
 
 val df = ddf.toDF

df.show()

df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000002.json文件内容:

{"commitInfo":{"timestamp":1611828239222,"operation":"UPDATE","operationParameters":{"predicate":"(id#395 > 2)"},"readVersion":4,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numAddedFiles":"1","numUpdatedRows":"2","numCopiedRows":"2"}}}
{"remove":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","deletionTimestamp":1611828238847,"dataChange":true}}
{"add":{"path":"part-00000-cf10bc40-bd4b-4a41-9897-2fc0fe2cfe09-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828239000,"dataChange":true}}

4、删除数据

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, lit}

object DeltaDelete {
 def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
 .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("delete")
 .master("local[*]").getOrCreate()


 //读取成表
 val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")


 ddf.delete(col("id")===2)

val df = ddf.toDF

df.show()

df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
 spark.stop()
 }
}

-----------------------------------

+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+

目录下00000000000000000003.json文件内容:

{"commitInfo":{"timestamp":1611828364666,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` = 2)\"]"},"readVersion":6,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numDeletedRows":"1","numAddedFiles":"1","numCopiedRows":"3"}}}
{"remove":{"path":"part-00000-3067922b-8702-49df-a903-aa778462d95b-c000.snappy.parquet","deletionTimestamp":1611828364655,"dataChange":true}}
{"add":{"path":"part-00000-e2f90118-2525-4614-8015-42c2c2545018-c000.snappy.parquet","partitionValues":{},"size":876,"modificationTime":1611828364000,"dataChange":true}}

5、数据历史版本

import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

/**
 * 查看历史版本
 * 回溯某历史版本数据
 * 清除历史版本
 */
object DeltaDataVersion {
  def main(args: Array[String]): Unit = {

    //输入回溯版本
 val versionCode = 1
 // val versionCode = args(0).trim

 val spark = SparkSession.builder()
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
 .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
 .appName("delete")
      .master("local[*]").getOrCreate()


    val df = DeltaTable.forPath(spark, "/Users/smzdm/Documents/data/0")

    //查看历史
 df.history().toDF().show(false)

    //可选择版本号(versionAsOf)或者时间戳(timestampAsOf)进行数据回溯
 val dff = spark.read.format("delta").option("versionAsOf", versionCode).load("/Users/smzdm/Documents/data/0")
    val dff = spark.read.format("delta").option("timestampAsOf", "2021-01-28 18:03:59").load("/Users/smzdm/Documents/data/0")

    //清理100小时历史数据,不填写默认保留7天数据,清理7天前数据
     df.vacuum(100)

    dff.toDF.show()

    spark.stop()

  }
}

6、测试双表merge

关于merge这一块测试时间较长,出现的情况特别多,数据可能不为准确。
前一篇文章提到过自动模式,在这一篇也进行测试。另外需要注意如下:
1、笛卡尔积情况会报错。必须提前对数据进行处理。
2、自动模式根据需求选择开启。
3、merge表之后,执行execute()函数进行触发。不需要重新写回表。

import io.delta.tables._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
 * 多表进行merge操作
 * merge.update:配合whenMatched(过滤条件)使用,只更新匹配到的数据
 * merge.insert:配合whenMatched(过滤条件)使用,插入新数据
 *
 * 组合:merge.update.insert 实现有则更新,无则新增
 *
 * 测试:
 * update:成功
 * insert:成功
 * 笛卡尔积:报错,需要提前对key进行预处理
 * 两表不同schema,进行merge。自动模式和默认模式测试
 *
 */

object DeltaMerge {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
      .config("spark.databricks.delta.schema.autoMerge.enabled",true) //开启自动模式演变,可选
      .appName("merge")
      .master("local[*]")
      .getOrCreate()

    val ddf = DeltaTable.forPath(spark, "DeltaTable").toDF.withColumn("rn", lit(1)).withColumn("flag", lit(100))
    val ddf1 = DeltaTable.forPath("/Users/smzdm/Documents/data/0")

    DeltaTable.forName("")

    val df = ddf.withColumn("rn",col("rn")+2)
      .withColumn("A",lit("a"))
      .withColumn("B",lit("B"))

    print("=============t====================")
    ddf.toDF.show()
    print("----------------t1-----------------")
    ddf1.toDF.show()
    print("==============df===================")
    df.show()

    //merge.update更新数据,配合whenMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
    ddf1.as("t1")
      .merge(ddf.as("t"), "t.id=t1.id")
      .whenMatched("t.id=4") //
      .updateExpr(Map("t1.rn" -> "t.rn")) //只更新匹配到ID的
      //.updateAll()  //更新所有数据
      .execute()

    //merge.insert插入数据,配合whenNotMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
    ddf1.as("t1")
      .merge(ddf.as("t"), "t.id=t1.id")
      .whenNotMatched("t.id=4") //加条件
      .insertExpr(Map("t1.rn" -> "t.rn")) //ID有则更新,无则新增
      //        .whenNotMatched()
      //      .insertAll() //插入所有数据
      .execute()

    //测试笛卡尔积,报错。根据官网提示,需要提前对key进行预处理,多key会导致无法识别更新哪一个
    ddf1.as("t1")
      .merge(df.as("t"),"t.id=t1.id")
      .whenMatched()
      .updateAll()
      .whenNotMatched()
      .insertAll()
      .execute()

    ddf1.toDF.show()

    spark.stop()
  }
}

7、流批同表测试

测试中。。。

总结

持续更新中。。。