欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming入门

程序员文章站 2024-02-22 21:22:34
...

Spark Streaming是一个有趣且强大的Spark扩展,它支持流数据或者快速的移动数据的近实时处理。
Spark Streaming实现了一个叫做“微批”(micro-batching)的概念,它将在线/流数据划分成若干明确的微批,每个微批作为一条记录被单独处理。每个微批的输出结果被发送到用户定义的输出流中,且能进一步存入HDFS、NoSQL,或者可以生成实时控制板。
批的大小受限于可接受的延迟,这个延迟从几毫秒到几秒不等。例如,每100ms收集所有Twiiter种子的数据,并作为一个批进行处理。这样系统会创建多个批,每个包含100ms内的Twiiter数据,然后将这些批连续地发给Spark进行处理。

下图所示为Spark Streaming的上层体系结构图:
Spark Streaming入门

输入数据流:这些输入数据源以很快的速度产生数据(每秒、每毫秒甚至更快),然后将这些数据以持续的流数据的形式发送。Spark提供了特定的接口来接受这些数据用以进一步的处理。这些数据流可以被分为基本数据源和高级数据源。
基本数据源:这些数据源的接口是内建的,或者与Spark Streaming打包在一起,使用这些数据源的接口不需要外部库。
高级数据源:高级数据源的接口不在Spark Streaming包里。使用它们的脚本需要链接到或者包含外部库作为脚本依赖。
:批就是一个RDD序列。Spark Streaming提供了一个RDD序列的抽象概念DStream(离散流)。DStream保存了其对应RDD序列的引用,这个RDD序列就是根据输入流或者输入流的转换流生成的。所有作用在DStream上的操作都直接作用在了其对应的RDD上面。
第一个Spark Streaming代码:

import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.dstream.ForEachDStream

/**
  * Created by 13798 on 2017/10/24.
  */
object ScalaFirstStreamingExample {
  def main(args: Array[String]): Unit = {
    println("Creating Spark Configuration")
    //创建一个Spark配置对象
    val conf = new SparkConf()
    conf.setAppName("My first Spark Streaming Application")

    println("Retreiving Streaming Context from Spark Conf")
    //从SparkConf对象获得Streaming上下文,第二个数据是流数据划分成批的时间间隔
    val streamCtx = new StreamingContext(conf, Seconds(5))
    /**
      * 定义Stream的类型,这里我们使用TCP套接字作为文本流
      * 程序持续监听特定机器(localhost)的9087端口。一旦收到数据就存入内存
      * 如果内存不足,就存入磁盘。然后把读到的数据转换成DStream形式
      * 其中StorageLevel.MEMORY_AND_DISK_SER就表示使用这种缓存级别的RDD将存储
      * 在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上只备份1份
      * 而StorageLevel.MEMORY_AND_DISK_SER_2表示使用这种缓存级别的RDD将存储
      * 在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
      */
    val lines = streamCtx.socketTextStream("localhost", 9087, MEMORY_AND_DISK_SER_2);

    /**
      * 把Split()函数应用到DStream的所有元素上,这样会从原始流的每个记录中产生
      * 多个新的纪录。然后利用flapmap合并这些记录并生成新的DStream
      */
    val words = lines.flatMap(x => x.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    printValues(wordCounts, streamCtx)
    //最重要的一句话,初始化Streaming上下文
    streamCtx.start()
    //等待执行结束
    streamCtx.awaitTermination()
  }

  //简单打印函数,打印RDD的所有元素
  def printValues(stream: DStream[(String, Int)], streamCtx: StreamingContext): Unit = {
    stream.foreachRDD(foreachFunc)
    def foreachFunc = (rdd: RDD[(String, Int)]) => {
      val array = rdd.collect()
      println("------------Start Printing Results-----------------")
      for(res <- array){
        println(res)
      }
      println("------------Finishing Printing Results--------------")
    }
  }

}
相关标签: spark-streaming