Delta Lake 测试案例
程序员文章站
2022-07-14 20:39:10
...
Delta Lake 测试案例
本篇我将写几个测试用例,来测试其功能,我这里使用sbt去创建项目,数据落盘到我本地电脑
一、创建项目:
组件 | 版本 |
---|---|
sbt | 1.4.2 |
scala | 2.12.10 |
Spark | 3.0.0 |
DeltaLake | 0.7.0 |
build.sbt文件
name := "DaltaLake"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "io.delta" %% "delta-core" % "0.7.0"
libraryDependencies += "log4j" % "log4j" % "1.2.17"
二、测试案例
1、手动创建数据
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
object SparkDataInot {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Test"))
val spark = SparkSession.builder().getOrCreate()
val rdd = sc.makeRDD(Seq(1, 2, 3, 4))
import spark.implicits._
val df = rdd.toDF("id")
df.write.format("delta").save("/Users/smzdm/Documents/data/0")
// df.write.format("delta").save("DeltaTable") 当然,本地测试的话,这么写也是OK的,存储位置是项目目录下
df.show()
sc.stop()
spark.stop()
}
}
-----------------------------------
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+
目录下00000000000000000000.json文件内容:
{"commitInfo":{"timestamp":1611823662796,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputBytes":"2007","numOutputRows":"4"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","partitionValues":{},"size":299,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
{"add":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","partitionValues":{},"size":427,"modificationTime":1611823662000,"dataChange":true}}
以下每次操作写表会生成一个write的json文件,我这里就不重复写了
{"commitInfo":{"timestamp":1611828204517,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":3,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"add":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828204000,"dataChange":true}}
{"remove":{"path":"part-00000-9374a554-221f-4fc6-8197-486cc61d9dc5-c000.snappy.parquet","deletionTimestamp":1611828204512,"dataChange":true}}
2、测试Delta表更新Schema(其实Spark本身可以实现更新Schema)
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
/**
* 测试案例:新增schema
*/
object DeltaSchema {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.read.format("delta").load("/Users/smzdm/Documents/data/0")
val df1 = df.withColumn("rn",row_number()over(Window.orderBy("id")))
.withColumn("flag",when(col("rn")===lit(1),lit(1)).otherwise(lit(0)))
df1.show()
df1.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
spark.stop()
}
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+
目录下00000000000000000001.json文件内容:
{"commitInfo":{"timestamp":1611824104223,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"886","numOutputRows":"4"}}}
{"metaData":{"id":"0b3ba27f-6a52-48e4-b9c4-6a1c3818a25c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rn\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flag\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1611823659077}}
{"add":{"path":"part-00000-5f96937a-30aa-4e23-af03-f0679367ca7c-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611824103000,"dataChange":true}}
{"remove":{"path":"part-00001-a098f9cb-ceaa-4ef7-aa5e-10ba6f90fe7f-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00003-f62f5dac-6fe7-4cee-bf43-b1488996db0e-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00000-cd0851fa-df9c-48c9-b94e-24ad9bfa2b33-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00007-17f127f0-f378-4a74-9b92-97ac65f57c23-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
{"remove":{"path":"part-00005-4de1c7f7-d399-483b-bc7f-77e7b21dd017-c000.snappy.parquet","deletionTimestamp":1611824104202,"dataChange":true}}
3、更新数据
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import io.delta.tables._
/**
* 测试案例:更新update
*/
object DeltaUpdate {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
.appName("update")
.master("local[*]").getOrCreate()
import spark.implicits._
//读取成表
val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")
// update(where条件,Map(更新字段->(更新逻辑)))
ddf.update(col("id")>2,Map("id"->(col("id")+10)))
val df = ddf.toDF
df.show()
df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
spark.stop()
}
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 2| 2| 0|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+
目录下00000000000000000002.json文件内容:
{"commitInfo":{"timestamp":1611828239222,"operation":"UPDATE","operationParameters":{"predicate":"(id#395 > 2)"},"readVersion":4,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numAddedFiles":"1","numUpdatedRows":"2","numCopiedRows":"2"}}}
{"remove":{"path":"part-00000-cb4cdcc9-5e71-4f45-ae38-01c1a526e3c4-c000.snappy.parquet","deletionTimestamp":1611828238847,"dataChange":true}}
{"add":{"path":"part-00000-cf10bc40-bd4b-4a41-9897-2fc0fe2cfe09-c000.snappy.parquet","partitionValues":{},"size":886,"modificationTime":1611828239000,"dataChange":true}}
4、删除数据
import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, lit}
object DeltaDelete {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
.appName("delete")
.master("local[*]").getOrCreate()
//读取成表
val ddf = DeltaTable.forPath(spark,"/Users/smzdm/Documents/data/0")
ddf.delete(col("id")===2)
val df = ddf.toDF
df.show()
df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("/Users/smzdm/Documents/data/0")
spark.stop()
}
}
-----------------------------------
+---+---+----+
| id| rn|flag|
+---+---+----+
| 1| 1| 1|
| 13| 3| 0|
| 14| 4| 0|
+---+---+----+
目录下00000000000000000003.json文件内容:
{"commitInfo":{"timestamp":1611828364666,"operation":"DELETE","operationParameters":{"predicate":"[\"(`id` = 2)\"]"},"readVersion":6,"isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numDeletedRows":"1","numAddedFiles":"1","numCopiedRows":"3"}}}
{"remove":{"path":"part-00000-3067922b-8702-49df-a903-aa778462d95b-c000.snappy.parquet","deletionTimestamp":1611828364655,"dataChange":true}}
{"add":{"path":"part-00000-e2f90118-2525-4614-8015-42c2c2545018-c000.snappy.parquet","partitionValues":{},"size":876,"modificationTime":1611828364000,"dataChange":true}}
5、数据历史版本
import io.delta.tables.DeltaTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
/**
* 查看历史版本
* 回溯某历史版本数据
* 清除历史版本
*/
object DeltaDataVersion {
def main(args: Array[String]): Unit = {
//输入回溯版本
val versionCode = 1
// val versionCode = args(0).trim
val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
.appName("delete")
.master("local[*]").getOrCreate()
val df = DeltaTable.forPath(spark, "/Users/smzdm/Documents/data/0")
//查看历史
df.history().toDF().show(false)
//可选择版本号(versionAsOf)或者时间戳(timestampAsOf)进行数据回溯
val dff = spark.read.format("delta").option("versionAsOf", versionCode).load("/Users/smzdm/Documents/data/0")
val dff = spark.read.format("delta").option("timestampAsOf", "2021-01-28 18:03:59").load("/Users/smzdm/Documents/data/0")
//清理100小时历史数据,不填写默认保留7天数据,清理7天前数据
df.vacuum(100)
dff.toDF.show()
spark.stop()
}
}
6、测试双表merge
关于merge这一块测试时间较长,出现的情况特别多,数据可能不为准确。
前一篇文章提到过自动模式,在这一篇也进行测试。另外需要注意如下:
1、笛卡尔积情况会报错。必须提前对数据进行处理。
2、自动模式根据需求选择开启。
3、merge表之后,执行execute()函数进行触发。不需要重新写回表。
import io.delta.tables._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/**
* 多表进行merge操作
* merge.update:配合whenMatched(过滤条件)使用,只更新匹配到的数据
* merge.insert:配合whenMatched(过滤条件)使用,插入新数据
*
* 组合:merge.update.insert 实现有则更新,无则新增
*
* 测试:
* update:成功
* insert:成功
* 笛卡尔积:报错,需要提前对key进行预处理
* 两表不同schema,进行merge。自动模式和默认模式测试
*
*/
object DeltaMerge {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") //这两个参数要么这里加,要么提交时候加
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") //这两个参数要么这里加,要么提交时候加
.config("spark.databricks.delta.schema.autoMerge.enabled",true) //开启自动模式演变,可选
.appName("merge")
.master("local[*]")
.getOrCreate()
val ddf = DeltaTable.forPath(spark, "DeltaTable").toDF.withColumn("rn", lit(1)).withColumn("flag", lit(100))
val ddf1 = DeltaTable.forPath("/Users/smzdm/Documents/data/0")
DeltaTable.forName("")
val df = ddf.withColumn("rn",col("rn")+2)
.withColumn("A",lit("a"))
.withColumn("B",lit("B"))
print("=============t====================")
ddf.toDF.show()
print("----------------t1-----------------")
ddf1.toDF.show()
print("==============df===================")
df.show()
//merge.update更新数据,配合whenMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
ddf1.as("t1")
.merge(ddf.as("t"), "t.id=t1.id")
.whenMatched("t.id=4") //
.updateExpr(Map("t1.rn" -> "t.rn")) //只更新匹配到ID的
//.updateAll() //更新所有数据
.execute()
//merge.insert插入数据,配合whenNotMatched(过滤条件)使用,Map(需要更新字段->更新来源字段)
ddf1.as("t1")
.merge(ddf.as("t"), "t.id=t1.id")
.whenNotMatched("t.id=4") //加条件
.insertExpr(Map("t1.rn" -> "t.rn")) //ID有则更新,无则新增
// .whenNotMatched()
// .insertAll() //插入所有数据
.execute()
//测试笛卡尔积,报错。根据官网提示,需要提前对key进行预处理,多key会导致无法识别更新哪一个
ddf1.as("t1")
.merge(df.as("t"),"t.id=t1.id")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
ddf1.toDF.show()
spark.stop()
}
}
7、流批同表测试
测试中。。。
总结
持续更新中。。。