使用Structured Streaming和Delta Lake构建实时数仓(入门)
程序员文章站
2022-03-08 14:06:15
...
Delta Lake的批流一体化设计,并且和HDFS天然的兼容,让他具有构建实时数仓的能力。本文介绍通过spark的Structured Streaming结合Delta Lake来构建基本的实时数仓。
基本结构
# 通过struct streaming来读取数据
sdf = spark.readStream.format("")
# 操作数据
sdf2 = sdf.groupBy.count()
# 把数据保存到delta表中
sdf2 = writeStream
.format("delta")
.outputMode("append")
.option()
.start("")
实战
说明,该实战代码是在pyspark-shell中测试完成。
启动带有Delta Lake的pyspark shell
./bin/pyspark --master local --jars ./delta-core_2.11-0.6.1.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
准备网络流数据源
开启一个终端,并输入以下命令:
$ nc -lk 10002
编写并执行实时数据处理代码
在pyspark-shell中输入以下代码:
import pyspark.sql.functions as F
lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 10002) .load()
words = lines.select(F.explode(F.split(lines.value, " ") ).alias("word") )
wordCounts = words.groupBy("word").count()
wordCounts.writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta/wordCounts/_checkpoints/streaming-agg") .start("/tmp/delta/wordCounts")
在网络输入终端中,输入一些单词:
$ nc -lk 10002
hello world
通过delta表来读取数据
df = spark.read.format("delta").load("/tmp/delta/wordCounts")
df.show()
可以看到数据已经写入到delta lake表了。
小结
通过这种方式可以把数据实时的写入到hdfs中,可以看到非常简单易用。然后可以使用delta lake的特性来对数据进行处理。
这里要注意的是实时流数据写入到HDFS中时,可以产生很多小文件,幸运的是delta lake提供了很多避免小文件的方式,典型的方法有:文件合并,减少shuffle写的分区数,删除过期的文件等。这些在后面进行介绍。