欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用Structured Streaming和Delta Lake构建实时数仓(入门)

程序员文章站 2022-03-08 14:06:15
...

Delta Lake的批流一体化设计,并且和HDFS天然的兼容,让他具有构建实时数仓的能力。本文介绍通过spark的Structured Streaming结合Delta Lake来构建基本的实时数仓。

基本结构

# 通过struct streaming来读取数据
sdf = spark.readStream.format("")

# 操作数据
sdf2 = sdf.groupBy.count()

# 把数据保存到delta表中
sdf2 = writeStream
				.format("delta")
				.outputMode("append")
				.option()
				.start("")

实战

说明,该实战代码是在pyspark-shell中测试完成。

启动带有Delta Lake的pyspark shell

./bin/pyspark --master local --jars ./delta-core_2.11-0.6.1.jar  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

准备网络流数据源

开启一个终端,并输入以下命令:

$ nc -lk 10002

编写并执行实时数据处理代码

在pyspark-shell中输入以下代码:

import pyspark.sql.functions as F

lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 10002) .load()
words = lines.select(F.explode(F.split(lines.value, " ") ).alias("word") )
wordCounts = words.groupBy("word").count()

wordCounts.writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta/wordCounts/_checkpoints/streaming-agg") .start("/tmp/delta/wordCounts")

在网络输入终端中,输入一些单词:

$ nc -lk 10002
hello world

通过delta表来读取数据

df = spark.read.format("delta").load("/tmp/delta/wordCounts")
df.show()

可以看到数据已经写入到delta lake表了。

小结

通过这种方式可以把数据实时的写入到hdfs中,可以看到非常简单易用。然后可以使用delta lake的特性来对数据进行处理。

这里要注意的是实时流数据写入到HDFS中时,可以产生很多小文件,幸运的是delta lake提供了很多避免小文件的方式,典型的方法有:文件合并,减少shuffle写的分区数,删除过期的文件等。这些在后面进行介绍。