欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

deta lake

程序员文章站 2022-03-08 14:06:57
...

环境搭建

pip install --upgrade pyspark
pyspark --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
 
from delta.tables import *

Create a table

data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
>>> spark.sql("CREATE TABLE IF NOT EXISTS events USING DELTA LOCATION '/delta/events'");
DataFrame[]

Read data

>>> df = spark.read.format("delta").load("/tmp/delta-table")
>>> df.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+

>>> spark.sql("select id from events2").show();
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+

Update table data

data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

Conditional update without overwrite

>>> from delta.tables import *
>>> from pyspark.sql.functions import *
>>> deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
>>> deltaTable.update(
... condition = expr("id % 2 == 0"),
... set = { "id": expr("id + 100") })
>>> deltaTable.delete(condition = expr("id % 2 == 0"))
>>> newData = spark.range(0, 20)
>>> deltaTable.alias("oldData").merge(newData.alias("newData"),"oldData.id = newData.id").whenMatchedUpdate(set = { "id": col("newData.id") }).whenNotMatchedInsert(values = { "id": col("newData.id") }).execute()
>>> deltaTable.toDF().show()
+---+
| id|
+---+
| 9|
| 4|
| 18|
| 17|
| 6|
| 3|
| 8|
| 2|
| 10|
| 7|
| 12|
| 11|
| 1|
| 16|
| 19|
| 15|
| 0|
| 5|
| 13|
| 14|
+---+