欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Structured Streaming

程序员文章站 2022-05-22 09:45:21
...

Spark Structured Streaming

一、概述

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

简单来说Spark Structured Streaming提供了流数据的快速、可靠、容错、端对端的精确一次处理语义,它是建立在SparkSQL基础之上的一个流数据处理引擎;

我们依然可以使用Spark SQL的Dataset/DataFrame API操作处理流数据(操作方式类似于Spark SQL的批数据处理); 默认情况下,Spark Structured Streaming依然采用Spark Micro Batch Job计算模型,实现100ms的端对端的精确一次处理语义;Spark2.3版本之后提供了另外一种计算引擎Continuous Processing能够实现端对端的低于1ms的最少一次处理语义

二、Quikc Example

Spark Structured Streaming版本的wordcount应用

导入依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.4.4</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.4.4</version>
</dependency>

开发应用

package quick

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object WordCountApplication {
  def main(args: Array[String]): Unit = {
    //1. spark sql spark Session
    val spark = SparkSession.builder().appName("wordcount").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    //2. 构建基于流数据的DF
    val sourceDF = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

    val flatMapDF = sourceDF.flatMap(row => row.getString(0).split("\\s"))


    //3. 对df应用方法操作或者SQL操作
    flatMapDF.createOrReplaceTempView("t_word")

    val resultDF = spark.sql("select value,count(value) from t_word group by value")

    //4. 结果写出
    resultDF
    .writeStream
    .format("console")
    // complete模式适用于聚合后的DF
    // .outputMode(OutputMode.Append()) // 将ResultDF中的内容完整的写出
    .outputMode(OutputMode.Complete()) // 将ResultDF中的内容完整的写出
    .start() // 启动流式计算
    .awaitTermination()
  }
}

-------------------------------------------
+-----+------------+
|value|count(value)|
+-----+------------+
|Hello|           1|
|Spark|           1|
+-----+------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+------------+
|value|count(value)|
+-----+------------+
|Hello|           2|
|Spark|           2|
+-----+------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+------------+
|value|count(value)|
+-----+------------+
|Hello|           3|
|Scala|           1|
|Spark|           2|
+-----+------------+

启动nc数据服务器

nc -lk 9999

三、程序模型

Structured Streaming核心思想是通过将连续的数据流看做是一张持续Append的Input Table(*表| 动态表),因此可以同使用SQL查询;底层依然使用时Spark微批的处理模型

Spark Structured Streaming

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HhMXQLbJ-1581938855659)(assets/未命名表单.png)]

对Input Table进行Query处理会得到一个ResultTable(结果表),当数据流中有新的数据抵达会追加到InputTable中,并且应用Query处理后会更新到ResultTable中,每一次操作结束后更新的结果写出到外围的存储系统;

Output Mode定义了如何将结果表中的数据写出到外围的存储系统,目前Spark支持三种输出模式:

  • Complete Mode: 将结果表中的数据全量式的写出到外围存储系统;适用于分组聚合后的结果表输出,不适用于普通查询
  • Append Mode:只会将结果表中新追加的数据写出到外围的存储系统
  • Update Mode: 将结果表中更新的行数据写出到外围的存储系统

由于结构化流计算特点,Spark会在内存中存储当前计算的中间结果(结果表,类似于状态);Spark并不会存储InputTable的数据,一旦InputTable中的数据处理完成,读取的数据会立即被丢弃,整个聚合过程无需用户干预

四、容错处理

Spark结构化流通过Checkpoint(检查点 本地状态数据的远程备份)和Write Ahead Log(WAL 写日志)记录每一次批处理的数据源的消费偏移量(区间),可以保证在处理失败时重复的读取数据源中数据。其次Spark结构化流还提供了Sink的幂等写支持;因此Spark结构化流实现了end-to -end exactly once精确一次处理语义的故障处理;

五、Structured Streaming API

Creating streaming DataFrames and streaming Datasets

创建基于流计算的DF和DS

Input Sources(输入源)

Sources的详情表格

Source Options Fault-tolerant Notes
File source path: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section. Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
Socket Source host: host to connect to, must be specified port: port to connect to, must be specified No
Rate Source rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed. Yes
Kafka Source See the Kafka Integration Guide. Yes
File(基于文件系统的数据源)

支持多种文件格式:text、json、csv、parquet等,当这些数据放入到数据采样目录是,系统会以流的形式读取采样目录下文件

package sources

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object FileSourceDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("file source").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    // 构建基于文件系统流式DF
    val df = spark
      .readStream
      .format("text") // json\csv\parquet\orc
      .load("hdfs://spark:9000/data") // HDFS或者LocalFS的Path

    df.createOrReplaceTempView("t_word")

    val df2 = spark.sql("select * from t_word")

    df2
      .writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
  }
}

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+
|        value|
+-------------+
|welcome to bj|
|welcome to bj|
|welcome to tj|
|welcome to tj|
|welcome to tj|
+-------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+
|   value|
+--------+
|  1,可乐|
|  2,薯片|
|3,方便面|
+--------+
Kafka(基于Kafka流数据平台的数据源)

读取Kafka Topic中的数据

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.4.4</version>
</dependency>

package sources

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

// 固定速率
object KafkaSourceDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("rate source").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    // 构建基于kafka流式DF
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","spark:9092")
      .option("subscribe","streams")
      .load()

    df.selectExpr("CAST(key as STRING)","CAST(value as STRING)","CAST(topic as STRING)","CAST(offset as LONG)")
      .createOrReplaceTempView("t_kafka")

    val df2 = spark.sql("select * from t_kafka")

    df2
      .writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
  }
}

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+-----+------+
|key|value|topic|offset|
+---+-----+-----+------+
+---+-----+-----+------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----------+-------+------+
| key|      value|  topic|offset|
+----+-----------+-------+------+
|null|Hello Kafka|streams|     1|
+----+-----------+-------+------+
基于Kafka的批处理
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

备注:"""{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""

  • -2 代表的是earlist
  • -1 代表的是latest
Socket(测试)

Rate(测试)
package sources

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
// 固定速率
object RateSourceDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("rate source").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    // 构建基于文件系统流式DF
    val df = spark
      .readStream
      .format("rate") // json\csv\parquet\orc
      .load() // HDFS或者LocalFS的Path

    df.createOrReplaceTempView("t_word")

    val df2 = spark.sql("select * from t_word")

    df2
      .writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
  }
}

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2020-02-13 16:03:...|    0|
+--------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2020-02-13 16:03:...|    1|
+--------------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2020-02-13 16:03:...|    2|
+--------------------+-----+

Output Sink(输出操作)

Sink Supported Output Modes Options Fault-tolerant Notes
File Sink Append path: path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for “parquet” format options see DataFrameWriter.parquet() Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Sink Append, Update, Complete See the Kafka Integration Guide Yes (at-least-once) More details in the Kafka Integration Guide
Foreach Sink Append, Update, Complete None Yes (at-least-once) More details in the next section
ForeachBatch Sink Append, Update, Complete None Depends on the implementation More details in the next section
Console Sink Append, Update, Complete numRows: Number of rows to print every trigger (default: 20) truncate: Whether to truncate the output if too long (default: true) No
Memory Sink Append, Complete None No. But in Complete Mode, restarted query will recreate the full table. Table name is the query name.
File Sink
df2
.writeStream
.format("csv")  // json csv text orc parquet等文本格式
.outputMode(OutputMode.Append()) // Append默认输出模式
.option("checkpointLocation","hdfs://spark:9000/sss_v1") // 检查点路径(HDFS) 主要用以容错处理
.start("/Users/gaozhy/data/170") //HDFS PATH或者 Local FS PATH
.awaitTermination()
Kafka Sink

http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

// kafka sink
// 创建结果topic:bin/kafka-topics.sh --create --topic streams_result --zookeeper spark:2181 --partitions 1 --replication-factor 1
// 启动结果topic的消费者:bin/kafka-console-consumer.sh --topic streams_result --bootstrap-server spark:9092 --property print.key=true
val df2 = spark.sql("select value as key,count(value) as value from t_kafka group by value")

df2
.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") // 查询结果表 key和value的值进行格式转换
.writeStream
.format("kafka")
.outputMode(OutputMode.Complete())
.option("checkpointLocation","hdfs://spark:9000/sss_v2")
.option("kafka.bootstrap.servers","spark:9092")
.option("topic","streams_result")
.start()
.awaitTermination()
Foreach Sink
// foreach sink
val df2 = spark.sql("select value as key,count(value) as value from t_kafka group by value")
df2
.writeStream
.outputMode(OutputMode.Complete())
.foreach(new ForeachWriter[Row] {
  /**
         * 打开
         *
         * @param partitionId 分区序号
         * @param epochId     数据标识
         * @return true 对当前数据进行foreach处理  只有open方法返true才会调用process处理方法
         */
  override def open(partitionId: Long, epochId: Long): Boolean = true

  /**
         * 处理方法
         *
         * @param value ResultTable中的一行记录 行对象
         */
  override def process(value: Row): Unit = {
    // row的输出操作
    val word = value.getString(0)
    val num = value.getLong(1)
    println(word + "\t" + num)
  }

  /**
         * 关闭方法(出现错误或者null)
         *
         * @param errorOrNull
         */
  override def close(errorOrNull: Throwable): Unit = {
    if (errorOrNull != null) errorOrNull.printStackTrace()
  }
})
.start()
.awaitTermination()
Console Sink(略)
Memory Sink(略)

数据分析的操作语法

SQL语法,和Spark SQL批处理的语法是一样的

Window On EventTime

Spark Structured Streaming基于EventTime窗口计算,整体的窗口计算思路非常简单类似于分组聚合(首先根据事件时间划分窗口,再对窗口的数据进行数据分析处理)

Spark Structured Streaming

package window

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object WindowOnEventTimeDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("window operation").master("local[*]").getOrCreate()
    val df = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    spark.sparkContext.setLogLevel("ERROR")
    // 窗口计算 数据 + 时间戳
    // 数据格式:单词,时间戳
    // 如:Hello,1581652800000  2020-02-14 12:00:00
    //    World,1581652801000  2020-02-14 12:00:01
    import spark.implicits._
    val convertDF = df
      .map(row => {
        val line = row.getString(0)
        val arr = line.split(",")
        val word = arr(0)
        val ts = arr(1)
        // Long时间戳 ---> TimeStamp 时间戳对象中
        val timestamp = new Timestamp(ts.toLong)
        (word, timestamp)
      })
      .toDF("word", "ts")

    import org.apache.spark.sql.functions._
    val query = convertDF
      // 第一个参数是窗口的划分规则(参数1:时间戳列 参数2:窗口大小 参数3:窗口滑动步长)
      .groupBy(
        window($"ts", "10 seconds", "5 seconds"),
        $"word"
      )
      .count()
    /*
    root
     |-- window: struct (nullable = true)
     |    |-- start: timestamp (nullable = true)
     |    |-- end: timestamp (nullable = true)
     |-- word: string (nullable = true)
     |-- count: long (nullable = false)
     */

    query
      .map(row => {
        val window = row.getStruct(0)
        val start = window.getTimestamp(0)
        val end = window.getTimestamp(1)
        val sdf = new SimpleDateFormat("HH:mm:ss")
        (sdf.format(start), sdf.format(end), row.getString(1), row.getLong(2))
      })
      .toDF("start","end","word","count")
      .writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .start()
      .awaitTermination()
  }
}

注意:

  • Spark Structured Streaming是基于EventTime划分窗口,要求DF中必须提供一个类型为Timestamp类型的时间戳字段
  • 窗口的范围是前闭后开的范围区间
  • 窗口划分时,先根据事件事件划分窗口,如果需要分组聚合,再对分组列应用分组操作

Handling Late Data and Watermarking(延迟数据处理和水位线)

默认情况下,Spark会把落入到时间窗口的数据进行聚合操作。但是需要思考的是Event-Time是基于事件的时间戳进行窗口聚合的。那就有可能事件窗口已经触发很久了,但是有一些元素因为某种原因,导致迟到了,这个时候Spark需要将迟到的的数据加入到已经触发的窗口进行重复计算。但是需要注意如果在长时间的流计算过程中,如果不去限定窗口计算的时间,那么意味着Spark要在内存中一直存储窗 口的状态,这样是不切实际的,因此Spark提供一种称为watermarker的机制用于限定存储在Spark内存中中间结果存储的时间,这样系统就可以将已经确定触发过的窗口的中间结果给删除。如果后续还有数据在窗口endtime以后抵达该窗口,Spark把这种数据定义为late数据。其中watermarker计算方式 event time seen by engine - late threshold 如果watermarker的取值大于了时间窗口的 endtime即可认定该窗口的计算结果就可以被丢弃了。如果此时再有数据落入到已经被丢弃的时间窗 口,则该迟到的数据会被Spark放弃更新,也就是丢弃。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j8H2KK5l-1581938855660)(assets/image-20200214163600174.png)]

水位线的计算公式
wm = max event time - threshold 

水位线的作用

  • 鉴别延迟数据的有效性:在水位线以内的数据都是有效数据参与窗口的计算,水位线以外的数据则为过期数据丢弃
  • 保证计算节点的内容占用保证在一个合理的范围区间: 窗口的endtime <= 水位线,则窗口的数据视为全部抵达,窗口计算完成写出到外围的存储系统后,则会丢弃ResultTable中窗口的处理结果
package window

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object WaterMarkingDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("window operation").master("local[*]").getOrCreate()
    val df = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    spark.sparkContext.setLogLevel("ERROR")
    // 窗口计算 数据 + 时间戳
    // 数据格式:单词,时间戳
    // 如:Hello,1581652800000  2020-02-14 12:00:00
    //    World,1581652801000  2020-02-14 12:00:01
    import spark.implicits._
    val convertDF = df
      .map(row => {
        val line = row.getString(0)
        val arr = line.split(",")
        val word = arr(0)
        val ts = arr(1)
        // Long时间戳 ---> TimeStamp 时间戳对象中
        val timestamp = new Timestamp(ts.toLong)
        (word, timestamp)
      })
      .toDF("word", "ts")

    import org.apache.spark.sql.functions._
    val query = convertDF
      // 设定水位线
      // 参数一:事件时间的字段名 参数二:延迟的阈值
      .withWatermark("ts","10 seconds")
      // 第一个参数是窗口的划分规则(参数1:时间戳列 参数2:窗口大小 参数3:窗口滑动步长)
      .groupBy(
        window($"ts", "10 seconds", "5 seconds"),
        $"word"
      )
      .count()
    /*
    root
     |-- window: struct (nullable = true)
     |    |-- start: timestamp (nullable = true)
     |    |-- end: timestamp (nullable = true)
     |-- word: string (nullable = true)
     |-- count: long (nullable = false)
     */

    query
      .map(row => {
        val window = row.getStruct(0)
        val start = window.getTimestamp(0)
        val end = window.getTimestamp(1)
        val sdf = new SimpleDateFormat("HH:mm:ss")
        (sdf.format(start), sdf.format(end), row.getString(1), row.getLong(2))
      })
      .toDF("start", "end", "word", "count")
      .writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
  }
}

流式计算中的时间

  • 事件时间(数据本身产生时间)
  • 摄入时间(数据写入到流式存储系统的时间,如果是Kafka则是Rcord的Timestamp)
  • 处理时间(流式计算应用处理数据的时间)

开始水位线后的两种不同输出模式

  • Append模式:只有水位线>= 窗口endtime时才会触发这个窗口的计算

    Spark Structured Streaming

  • Update模式:每来一条数据,立即更新到结果表中,当水位线淹没过了窗口的endtime,则这个窗口晚抵达数据会立即被丢弃


六、Join操作

Spark Structured Streaming不仅支持流和批的Join操作,还支持流和流的Join操作;

  • Stream - Batch Join 2.0版本之后
  • Stream - Stream Join 2.3版本之后

Stream&Batch Join

一个数据流和一个静态批次数据的连接操作

package join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object StreamAndBatchJoin {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("join operation").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    // 构建一个数据流
    // 订单 order_id,product,price,num,user_id
    val orderInfoDF = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()


    import spark.implicits._
    // 静态批次数据
    val userInfoDF = spark.sparkContext.makeRDD(List((1, "zs"), (2, "ls"), (3, "ww"))).toDF("u_id", "name")

    // join
    orderInfoDF
      .map(row => {
        val arr = row.getAs[String]("value").split(",")
        (arr(0), arr(1), arr(2), arr(3), arr(4))
      })
      .toDF("order_id", "product", "price", "num", "user_id")
      // 每一个用户购买的订单
      .join(userInfoDF, $"user_id" === $"u_id", "left_outer")
      .writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .start()
      .awaitTermination()
  }
}

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+--------+-----+---+-------+----+----+
|order_id| product|price|num|user_id|u_id|name|
+--------+--------+-----+---+-------+----+----+
|     101|iphone11| 4999|  1|      2|   2|  ls|
+--------+--------+-----+---+-------+----+----+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------+-------------+-----+---+-------+----+----+
|order_id|      product|price|num|user_id|u_id|name|
+--------+-------------+-----+---+-------+----+----+
|     102|huawei mate30| 3999|  2|      1|   1|  zs|
+--------+-------------+-----+---+-------+----+----+

Stream&Stream Join

一个数据流和另外一个数据流的连接操作

要求:

  • 两个流必须声明watemarker(内连接可选,其它连接方式必须),保证处理引擎知道其延迟的最大程度
  • 在两端输入数据上定义event-time约束,这样引擎就可以计算出一个输入的旧数据何时不需要与另外一个输入进行匹配操作

Time Ranage Join Conditions

基于时间范围的连接条件

Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR)

package join

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

object StreamAndStreamJoinWithTimeRange {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("join operation").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._

    // 构建一个数据流
    // 订单 order_id,product,price,num,user_id,create_time
    // 101,iphone11,4999,1,1,1581909720000   11:22:00
    val orderInfoDF = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
      .map(row => {
        // order line => Tuple6
        val arr = row.getAs[String]("value").split(",")
        (arr(0), arr(1), arr(2), arr(3), arr(4), new Timestamp(arr(5).toLong))
      })
      .toDF("order_id", "product", "price", "num", "user_id", "create_time")
      // 内连接可选 订单数据缓存3秒
      .withWatermark("create_time", "3 seconds")

    // 数据流
    // 用户信息  u_id,name,login_time
    // 1,zs,1581909720000
    val userInfoDF = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 8888)
      .load()
      .map(row => {
        val arr = row.getAs[String]("value").split(",")
        (arr(0), arr(1), new Timestamp(arr(2).toLong))
      })
      .toDF("u_id", "name", "login_time")
      //  内连接可选 用户信息数据缓存2秒
      .withWatermark("login_time", "2 seconds")

    import org.apache.spark.sql.functions._
    // join: time range 将时间范围符合条件的数据进行连接操作
    // 获取用户登录后2秒内的订单数据  u_id = user_id and create_time between login_time and login_time + 2s
    // A(1,zs,2019-2-14 12:00:00) B(用户id为1,并且创建时间在12:00:00-12:00:02秒)
    userInfoDF
      .join(orderInfoDF,
        expr(
          """
            | u_id = user_id
            |   and
            | create_time between login_time and login_time + interval 2 seconds
            |""".stripMargin))
      .writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .start()
      .awaitTermination()
  }
}

测试数据:
order
nc -lk 9999
101,iphone11,4999,1,1,1581909720000
101,iphone11,4999,1,1,1581909721000
101,iphone11,4999,1,1,1581909722000
101,iphone11,4999,1,1,1581909723000

user

1,zs,1581909720000


-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+----------+--------+-------+-----+---+-------+-----------+
|u_id|name|login_time|order_id|product|price|num|user_id|create_time|
+----+----+----------+--------+-------+-----+---+-------+-----------+
+----+----+----------+--------+-------+-----+---+-------+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----+----------+--------+-------+-----+---+-------+-----------+
|u_id|name|login_time|order_id|product|price|num|user_id|create_time|
+----+----+----------+--------+-------+-----+---+-------+-----------+
+----+----+----------+--------+-------+-----+---+-------+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|u_id|name|         login_time|order_id| product|price|num|user_id|        create_time|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|   1|  zs|2020-02-17 11:22:00|     101|iphone11| 4999|  1|      1|2020-02-17 11:22:00|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+----+----------+--------+-------+-----+---+-------+-----------+
|u_id|name|login_time|order_id|product|price|num|user_id|create_time|
+----+----+----------+--------+-------+-----+---+-------+-----------+
+----+----+----------+--------+-------+-----+---+-------+-----------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|u_id|name|         login_time|order_id| product|price|num|user_id|        create_time|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|   1|  zs|2020-02-17 11:22:00|     101|iphone11| 4999|  1|      1|2020-02-17 11:22:01|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----+----+----------+--------+-------+-----+---+-------+-----------+
|u_id|name|login_time|order_id|product|price|num|user_id|create_time|
+----+----+----------+--------+-------+-----+---+-------+-----------+
+----+----+----------+--------+-------+-----+---+-------+-----------+

-------------------------------------------
Batch: 6
-------------------------------------------
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|u_id|name|         login_time|order_id| product|price|num|user_id|        create_time|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+
|   1|  zs|2020-02-17 11:22:00|     101|iphone11| 4999|  1|      1|2020-02-17 11:22:02|
+----+----+-------------------+--------+--------+-----+---+-------+-------------------+

Join on event-time windows

建立在事件时间窗口上的连接操作

Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).

package join

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode

/**
 * 需要基于事件时间划分窗口
 */
object StreamAndStreamJoinOnEventTimeWindow {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("join operation").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    import org.apache.spark.sql.functions._
    // 构建一个数据流
    // 订单 order_id,product,price,num,user_id,create_time
    // 101,iphone11,4999,1,1,1581909720000   11:22:00
    val orderInfoDF = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
      .map(row => {
        // order line => Tuple6
        val arr = row.getAs[String]("value").split(",")
        (arr(0), arr(1), arr(2), arr(3), arr(4), new Timestamp(arr(5).toLong))
      })
      .toDF("order_id", "product", "price", "num", "user_id", "create_time")
      // 内连接可选 订单数据缓存3秒
      //.withWatermark("create_time", "3 seconds")
      // window(starttime --> endtime)
      .select(window($"create_time","5 seconds"),$"order_id", $"product", $"price", $"num", $"user_id", $"create_time")
      .withColumnRenamed("window","rightWindow")

    // 数据流
    // 用户信息  u_id,name,login_time
    // 1,zs,1581909720000
    val userInfoDF = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 8888)
      .load()
      .map(row => {
        val arr = row.getAs[String]("value").split(",")
        (arr(0), arr(1), new Timestamp(arr(2).toLong))
      })
      .toDF("u_id", "name", "login_time")
      //  内连接可选 用户信息数据缓存2秒
      //.withWatermark("login_time", "2 seconds")
      .select(window($"login_time","5 seconds"),$"u_id",$"name",$"login_time")
      .withColumnRenamed("window","leftWindow")

    userInfoDF
      .join(orderInfoDF,expr(
        """
          | u_id = user_id and leftWindow = rightWindow
          |""".stripMargin))
      .writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .start()
      .awaitTermination()
  }
}

Support matrix for joins in streaming queries
Left Input Right Input Join Type
Static Static All types Supported, since its not on streaming data even though it can be present in a streaming query
Stream Static Inner Supported, not stateful
Left Outer Supported, not stateful
Right Outer Not supported
Full Outer Not supported
Static Stream Inner Supported, not stateful
Left Outer Not supported
Right Outer Supported, not stateful
Full Outer Not supported
Stream Stream Inner Supported, optionally specify watermark on both sides + time constraints for state cleanup
Left Outer Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right Outer Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full Outer Not supported

七、第三方依赖问题

集群运行Spark应用时,如何解决jar包依赖问题

  • 依赖和程序一起打包

    <!--在执行package时候,将scala源码编译进jar--> 
    <plugin>
      <groupId>net.alchim31.maven</groupId> 
      <artifactId>scala-maven-plugin</artifactId> 
      <version>4.0.1</version>
      <executions>
        <execution> 
          <id>scala-compile-first</id> 
          <phase>process-resources</phase> <goals>
          <goal>add-source</goal>
          <goal>compile</goal>
          </goals>
        </execution>
      </executions>
    </plugin> <!--将依赖jar打入到jar中--> <plugin>
      <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
      </executions>
    </plugin>
    

    注意:

    • 将第三方依赖打入jar包时,需要排除spark集群能够提供的依赖
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.4.4</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql_2.11</artifactId>
     <version>2.4.4</version>
     <scope>provided</scope>
    </dependency>
    
  • 本地指定位置放jar包

    • 在线下载依赖jar包
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
      <version>2.4.4</version>
    </dependency>
    
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,xxx
    
    ../bin/spark-submit --master spark://spark:7077 --class KafkaSourceDemoOnSparkCluster --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 --total-executor-cores 4  sparkstructuredstreaming-1.0-SNAPSHOT.jar
    
相关标签: 大数据 spark