欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Structured Streaming

程序员文章站 2022-05-22 09:02:21
...

什么是Structured Streaming

泛指使用SQL操作Spark的流处理。Structured Streaming是一个scalable 和fault-tolerant 流处理引擎,该引擎是构建Spark SQL之上。可以使得用户以静态批处理的方式计算流数据。Structured Streaming底层会调用Spark SQL 引擎对流数据做增量和持续的更新计算并且输出最终结果。用户可以使用DataSet/DataFrame API 完成流处理中的常见问题:aggregations-聚合统计、event-time window-事件窗口、stream-to-batch/stream-to-stream join连接等功能。Structured Streaming可以通过 checkpointing (检查点)和 Write-Ahead Logs(写前日志)机制实现end-to-end(端到端)、exactly-once(进准一次)语义容错机制。总之Structured Streaming提供了 快速、可扩展、容错、端到端的精准一次的流处理,无需用户过多的干预。

Structured Streaming底层计算引擎默认采取的是micro-batch处理引擎(DStream一致的),除此之外Spark还提供了其它的处理模型可供选择:micro-batch-100msFixed interval micro-batchesOne-time micro-batchContinuous Processing-1ms(实验)

快速入门

创建Maven工程,引入相关依赖

<properties>
    <spark.version>2.4.3</spark.version>
    <scala.version>2.11</scala.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

编写应用程序

package com.hw.demo01

import org.apache.spark.sql.SparkSession

/**
  * @aurhor:fql
  * @date 2019/10/10 18:31 
  * @type: 单词统计案例
  */
object StructurestreamWordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark = SparkSession.builder()
      .appName("wordCount")
      .master("local[6]")
      .getOrCreate()

    spark.sparkContext.setLogLevel("FATAL")
    import spark.implicits._
    //2.通过流的方式创建DataFrame -细化
    val lines = spark.readStream
      .format("socket")  //指定方式
      .option("host", "CentOS") //指定主机名
      .option("port", 9999) //指定端口号
      .load()

    //3.执行SQL操作API -细化 窗口等
    val wordCounts = lines.as[String].flatMap(_.split("\\s+"))
      .groupBy("value").count()

    //4.构建StreamQuery 将结果写出去 --细化
    val query = wordCounts.writeStream
      .outputMode("complete") //表示全量输出,等价于Storm的updateStateByKey
      .format("console") //输出到控制台
      .start()
    //5.关闭流
    query.awaitTermination()
  }
}

进行测试

Structured Streaming

Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|   you|    1|
|   how|    1|
|  good|    1|
|    is|    1|
|   are|    1|
|  hahh|    1|
|     a|    1|
|  this|    1|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|   you|    1|
|   how|    1|
| good |    1|
|    is|    1|
|   are|    1|
|  hahh|    1|
|     a|    1|
|  this|    5|
+------+-----+

常规概念

结构化流处理中的关键思想是将实时数据流视为被连续追加的表。将输入数据流视为"Input Table"。流上到达的每个数据项都像是将新的行附加到Input Table中。
Structured Streaming
对Input Table的查询将生成“Result Table”。每个触发间隔(例如,每一秒钟),新行将附加到Input Table中,最终更新Result Table。无论何时更新Result Table,我们都希望将更改后的结果写入外部接收器(Sink)。
Structured Streaming“输出”定义为写到外部存储器的内容。输出支持以下模式的输出:

  • Complete Mode(状态):整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
  • Update Mode(状态):自上次触发以来,仅结果表中已更新的行将被写入外部存储(Spark 2.1.1),如果没有聚合该策略等价于Append Mode
  • Append Mode(无状态):自上次触发以来,仅追加到结果表中的新行将被写入外部存储。这仅适用于结果表中现有的行预计不会更改的查询。(Append也可以用在含有聚合的查询中,但是仅仅限制在窗口计算-后续讨论)
  • 注意:
    • Spark并不会存储Input Table的数据,一旦处理完数据之后,就将接收的数据丢弃。Spark仅仅维护的计算的中间结果(状态)
    • Structured Stream好处在于无需用户维护计算状态(相比较于Storm流处理),Spark就可以实现end-to-end(端到端),exactly-once(精准一次)语义容错机制。

DataFrames & Datasets 创建

kafka Source

  • 引入所需依赖
dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 编写应用程序
package com.hw.demo01

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
/**
  * @aurhor:fql
  * @date 2019/10/10 19:08 
  * @type:
  */
object StructuredKafkaSource {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark = SparkSession.builder()
      .appName("wordCount")
      .master("local[5]")
      .getOrCreate()

    import spark.implicits._
    spark.sparkContext.setLogLevel("FATAL")

    //2.通过流的方式创建DataFrame -细化
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "CentOS:9092")
      .option("subscribe", "topic01")
      .load()



    //3.执行SQL操作 API
    val wordCounts = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "partition", "offset", "CAST(timestamp AS LONG)")
      .flatMap(row => row.getAs[String]("value").split("\\s+"))
      .map((_, 1))
      .toDF("word", "num")
      .groupBy($"word")
      .agg(sum($"num"))
    //4.构建StreamQuery 将结果写出去
    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .start()
   //5.关闭流
    query.awaitTermination()
  }
}

  • 进行测试
    Structured Streaming
Batch: 0
-------------------------------------------
+----+--------+
|word|sum(num)|
+----+--------+
+----+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------+
|word|sum(num)|
+----+--------+
|thia|       1|
|  ha|       3|
|this|       1|
+----+--------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+--------+
| word|sum(num)|
+-----+--------+
|thisa|       1|
|  you|       4|
|  d  |       1|
| have|       1|
|dream|       1|
|    I|       1|
|    a|       1|
| this|       4|
+-----+--------+