欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Structured Streaming

程序员文章站 2022-05-22 09:01:33
...

Structured Streaming是Spark新提出的一种实时流的框架,它和Spark Streaming之间有什么区别呢?

一、为什么要有StructedStreaming?

Spark Streaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。它支持从多种数据源获取数据,包括Kafka、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后,还可以将处理结果存储到文件系统和数据库等。Spark Stream处理的数据流图:

Structured Streaming

Spark Streaming的处理机制如下图:
Structured Streaming
Spark Streaming接收流数据,并根据一定的时间间隔拆分成一批批的batch数据,用抽象接口DStream表示(DStream可以看成是一组RDD序列,每个batch对应一个RDD),然后通过Spark Engine处理这些batch数据,最终得到处理后的一批批结果数据。

Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是实验版本,从spark2.2开始为稳定版本),相对于Spark Streaming,它的优点如下:

  • 同时支持多种数据源的输入和输出
  • 以结构化的方式操作流式数据,能够使用Spark SQL处理离线的批处理一样,处理流数据,代码更加简洁,写法更加简单
  • 基于Event-Time,相比Spark Streaming的Processing-Time更精确,更符合业务场景
  • 解决了Spark Streaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的问题(Spark Streaming的硬伤)

二、StructuredStreaming的特性

1,结构化流式处理

a、Structed Streaming将实时流抽象成一张无边界的表,输入的每一条数据当成输入表的一个新行,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据。
Structured Streaming
b、输入的流数据以batch为单位被处理,每个batch会触发一次流式计算,计算结果被更新到Result Table。
如下图:设定batch长度为1s,每一秒从输入源读取数据到Input Table,然后触发Query计算,将结果写入Result Table。

Structured Streaming
c,最后将Result Table的结果写出到外部存储介质(如Kafka)

一共三种OutPut模式:

  • Append模式:只有自上次触发Query计算后,在Result Table表中附加的新行将被写入外部存储器。重点模式,一般使用它
  • Complete模式:将整个更新表写入到外部存储,每次batch触发计算,整张Result Table的结果都会被写出到外部存储介质。
  • Update模式:只有上次触发Query后,在Result Table表中更新的行将被写入外部存储器。注意,这和完全模式不同,因为此模式不输出未更改的行

2,基于Event-Time聚合&延迟数据处理

3,容错性

快速入门----wordCount

import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{Dataset, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("WordCount")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR") //设置日志级别避免影响查看控制台的日志
    import spark.implicits._
    //通过Spark sql 获取到结构流
    val ds: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", "192.168.10.111")
      .option("port", 9999)
      .load()
      .as[String]
    ds //默认dataStreaming返回的是dataframe,我们将其转换为dataset
    //写业务
    val words: Dataset[(String, Long)] = ds.flatMap(_.split("\\s+"))
      .map((_, 1))
      .groupByKey(_._1)
      .count()
    //
    words.writeStream.outputMode(OutputMode.Complete()) //统计全局结果,而不是单个批次
      .format("console")
      .start()
      .awaitTermination() //阻塞线程
  }
}

相关标签: Spark