Structured Streaming
什么是Structured Streaming
泛指使用SQL操作Spark的流处理。Structured Streaming是一个scalable 和fault-tolerant 流处理引擎,该引擎是构建Spark SQL之上。可以使得用户以静态批处理的方式计算流数据。Structured Streaming底层会调用Spark SQL 引擎对流数据做增量和持续的更新计算并且输出最终结果。用户可以使用DataSet/DataFrame API
完成流处理中的常见问题:aggregations-聚合统计、event-time window-事件窗口、stream-to-batch/stream-to-stream join连接等功能。Structured Streaming可以通过 checkpointing (检查点)和 Write-Ahead Logs(写前日志)机制实现end-to-end(端到端)、exactly-once(进准一次)语义容错机制。总之Structured Streaming提供了 快速、可扩展、容错、端到端的精准一次的流处理,无需用户过多的干预。
Structured Streaming底层计算引擎默认采取的是micro-batch
处理引擎(DStream一致的),除此之外Spark还提供了其它的处理模型可供选择:micro-batch-100ms
、Fixed interval micro-batches
、One-time micro-batch
、Continuous Processing-1ms(实验)
’
快速入门
创建Maven工程,引入相关依赖
<properties>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
编写应用程序
package com.hw.demo01
import org.apache.spark.sql.SparkSession
/**
* @aurhor:fql
* @date 2019/10/10 18:31
* @type: 单词统计案例
*/
object StructurestreamWordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark = SparkSession.builder()
.appName("wordCount")
.master("local[6]")
.getOrCreate()
spark.sparkContext.setLogLevel("FATAL")
import spark.implicits._
//2.通过流的方式创建DataFrame -细化
val lines = spark.readStream
.format("socket") //指定方式
.option("host", "CentOS") //指定主机名
.option("port", 9999) //指定端口号
.load()
//3.执行SQL操作API -细化 窗口等
val wordCounts = lines.as[String].flatMap(_.split("\\s+"))
.groupBy("value").count()
//4.构建StreamQuery 将结果写出去 --细化
val query = wordCounts.writeStream
.outputMode("complete") //表示全量输出,等价于Storm的updateStateByKey
.format("console") //输出到控制台
.start()
//5.关闭流
query.awaitTermination()
}
}
进行测试
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| you| 1|
| how| 1|
| good| 1|
| is| 1|
| are| 1|
| hahh| 1|
| a| 1|
| this| 1|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| you| 1|
| how| 1|
| good | 1|
| is| 1|
| are| 1|
| hahh| 1|
| a| 1|
| this| 5|
+------+-----+
常规概念
结构化流处理中的关键思想是将实时数据流视为被连续追加的表。将输入数据流视为"Input Table"。流上到达的每个数据项都像是将新的行附加到Input Table中。
对Input Table的查询将生成“Result Table”。每个触发间隔(例如,每一秒钟),新行将附加到Input Table中,最终更新Result Table。无论何时更新Result Table,我们都希望将更改后的结果写入外部接收器(Sink)。
“输出”定义为写到外部存储器的内容。输出支持以下模式的输出:
- Complete Mode(状态):整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
- Update Mode(状态):自上次触发以来,仅结果表中已更新的行将被写入外部存储(Spark 2.1.1),如果没有聚合该策略等价于Append Mode
- Append Mode(无状态):自上次触发以来,仅追加到结果表中的新行将被写入外部存储。这仅适用于结果表中现有的行预计不会更改的查询。(Append也可以用在含有聚合的查询中,但是仅仅限制在窗口计算-后续讨论)
- 注意:
- Spark并不会存储Input Table的数据,一旦处理完数据之后,就将接收的数据丢弃。Spark仅仅维护的计算的中间结果(状态)
- Structured Stream好处在于无需用户维护计算状态(相比较于Storm流处理),Spark就可以实现end-to-end(端到端),exactly-once(精准一次)语义容错机制。
DataFrames & Datasets 创建
kafka Source
- 引入所需依赖
dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
- 编写应用程序
package com.hw.demo01
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
/**
* @aurhor:fql
* @date 2019/10/10 19:08
* @type:
*/
object StructuredKafkaSource {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark = SparkSession.builder()
.appName("wordCount")
.master("local[5]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("FATAL")
//2.通过流的方式创建DataFrame -细化
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "CentOS:9092")
.option("subscribe", "topic01")
.load()
//3.执行SQL操作 API
val wordCounts = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "partition", "offset", "CAST(timestamp AS LONG)")
.flatMap(row => row.getAs[String]("value").split("\\s+"))
.map((_, 1))
.toDF("word", "num")
.groupBy($"word")
.agg(sum($"num"))
//4.构建StreamQuery 将结果写出去
val query = wordCounts.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
//5.关闭流
query.awaitTermination()
}
}
- 进行测试
Batch: 0
-------------------------------------------
+----+--------+
|word|sum(num)|
+----+--------+
+----+--------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------+
|word|sum(num)|
+----+--------+
|thia| 1|
| ha| 3|
|this| 1|
+----+--------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-----+--------+
| word|sum(num)|
+-----+--------+
|thisa| 1|
| you| 4|
| d | 1|
| have| 1|
|dream| 1|
| I| 1|
| a| 1|
| this| 4|
+-----+--------+
上一篇: Structured Streaming
下一篇: 内部类:局部内部类
推荐阅读
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
录音精灵Apowersoft Streaming Audio Recorder安装及激活图文教程(附注册码)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(二)
-
使用IIS Live Smooth Streaming技术搭建流媒体直播系统
-
.NET Core + gRPC 实现数据串流 (Streaming)
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
Hadoop streaming详细介绍