Structured Streaming
Structured Streaming是Spark新提出的一种实时流的框架,它和Spark Streaming之间有什么区别呢?
一、为什么要有StructedStreaming?
Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。它支持从多种数据源获取数据,包括Kafka、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后,还可以将处理结果存储到文件系统和数据库等。Spark Stream处理的数据流图:
Spark Streaming的处理机制如下图:
Spark Streaming接收流数据,并根据一定的时间间隔拆分成一批批的batch数据,用抽象接口DStream表示(DStream可以看成是一组RDD序列,每个batch对应一个RDD),然后通过Spark Engine处理这些batch数据,最终得到处理后的一批批结果数据。
Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是实验版本,从spark2.2开始为稳定版本),相对于Spark Streaming,它的优点如下:
- 同时支持多种数据源的输入和输出
- 以结构化的方式操作流式数据,能够使用Spark SQL处理离线的批处理一样,处理流数据,代码更加简洁,写法更加简单
- 基于Event-Time,相比Spark Streaming的Processing-Time更精确,更符合业务场景
- 解决了Spark Streaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的问题(Spark Streaming的硬伤)
二、StructuredStreaming的特性
1,结构化流式处理
a、Structed Streaming将实时流抽象成一张无边界的表,输入的每一条数据当成输入表的一个新行,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据。
b、输入的流数据以batch为单位被处理,每个batch会触发一次流式计算,计算结果被更新到Result Table。
如下图:设定batch长度为1s,每一秒从输入源读取数据到Input Table,然后触发Query计算,将结果写入Result Table。
c,最后将Result Table的结果写出到外部存储介质(如Kafka)
一共三种OutPut模式:
- Append模式:只有自上次触发Query计算后,在Result Table表中附加的新行将被写入外部存储器。重点模式,一般使用它
- Complete模式:将整个更新表写入到外部存储,每次batch触发计算,整张Result Table的结果都会被写出到外部存储介质。
- Update模式:只有上次触发Query后,在Result Table表中更新的行将被写入外部存储器。注意,这和完全模式不同,因为此模式不输出未更改的行
2,基于Event-Time聚合&延迟数据处理
3,容错性
快速入门----wordCount
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("WordCount")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") //设置日志级别避免影响查看控制台的日志
import spark.implicits._
//通过Spark sql 获取到结构流
val ds: Dataset[String] = spark.readStream
.format("socket")
.option("host", "192.168.10.111")
.option("port", 9999)
.load()
.as[String]
ds //默认dataStreaming返回的是dataframe,我们将其转换为dataset
//写业务
val words: Dataset[(String, Long)] = ds.flatMap(_.split("\\s+"))
.map((_, 1))
.groupByKey(_._1)
.count()
//
words.writeStream.outputMode(OutputMode.Complete()) //统计全局结果,而不是单个批次
.format("console")
.start()
.awaitTermination() //阻塞线程
}
}
推荐阅读
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
录音精灵Apowersoft Streaming Audio Recorder安装及激活图文教程(附注册码)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(二)
-
使用IIS Live Smooth Streaming技术搭建流媒体直播系统
-
.NET Core + gRPC 实现数据串流 (Streaming)
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
Hadoop streaming详细介绍