欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[spark streaming]生成RDD并执行Spark Job源码内幕解密

程序员文章站 2022-03-24 17:38:44
...

本博文主要包含以下内容:

  • DStream产生RDD的案例实战演示
  • DStream作为RDD模板的原理机制
  • 常见DStream生产RDD源码解密

这种常见的DStream包含三种类型,一种是输入的级别的InputDStream,第二种transformationDStream,第三种输出级别的ForeachDStream。

博文主要代码如下:

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(120))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream("master",9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println

通过集群集群处理数据,处理结果如下:

16/09/08 09:18:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 51 ms on localhost (1/1)
16/09/08 09:18:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
16/09/08 09:18:00 INFO scheduler.JobScheduler: Finished job streaming job 1473297480000 ms.0 from job set of time 1473297480000 ms
16/09/08 09:18:00 INFO scheduler.JobScheduler: Total delay: 0.927 s for time 1473297480000 ms (execution: 0.670 s)
16/09/08 09:18:00 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/09/08 09:18:00 INFO scheduler.InputInfoTracker: remove old batch metadata: 
16/09/08 09:18:15 INFO storage.MemoryStore: Block input-0-1473297495000 stored as bytes in memory (estimated size 16.0 B, free 89.8 KB)
16/09/08 09:18:15 INFO storage.BlockManagerInfo: Added input-0-1473297495000 in memory on localhost:53535 (size: 16.0 B, free: 511.1 MB)
16/09/08 09:18:15 WARN storage.BlockManager: Block input-0-1473297495000 replicated to only 0 peer(s) instead of 1 peers
16/09/08 09:18:15 INFO receiver.BlockGenerator: Pushed block input-0-1473297495000
16/09/08 09:20:00 INFO scheduler.JobScheduler: Starting job streaming job 1473297600000 ms.0 from job set of time 1473297600000 ms
16/09/08 09:20:00 INFO scheduler.JobScheduler: Added jobs for time 1473297600000 ms
16/09/08 09:20:00 INFO spark.SparkContext: Starting job: print at NetWorkWordCount.scala:24
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Registering RDD 7 (map at NetWorkWordCount.scala:23)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Got job 3 (print at NetWorkWordCount.scala:24) with 1 output partitions
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (print at NetWorkWordCount.scala:24)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 5)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 5 (MapPartitionsRDD[7] at map at NetWorkWordCount.scala:23), which has no missing parents
16/09/08 09:20:00 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.6 KB, free 92.4 KB)
16/09/08 09:20:00 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1604.0 B, free 94.0 KB)
16/09/08 09:20:00 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:53535 (size: 1604.0 B, free: 511.1 MB)
16/09/08 09:20:00 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 5 (MapPartitionsRDD[7] at map at NetWorkWordCount.scala:23)
...


16/09/08 09:20:00 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/09/08 09:20:00 INFO executor.Executor: Finished task 0.0 in stage 8.0 (TID 5). 1307 bytes result sent to driver
16/09/08 09:20:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 5) in 43 ms on localhost (1/1)
16/09/08 09:20:00 INFO scheduler.DAGScheduler: ResultStage 8 (print at NetWorkWordCount.scala:24) finished in 0.028 s
16/09/08 09:20:00 INFO scheduler.DAGScheduler: Job 4 finished: print at NetWorkWordCount.scala:24, took 0.119611 s
-------------------------------------------
Time: 1473297600000 ms
-------------------------------------------
(love,1)
(ge,1)
(i,1)

通过集群集群处理数据,我们通过日志可以得出以下结论: 
(1)这样一个流处理程序首先是一个spark应用程序,然后才是一个流处理程序 
(2)我们从日志上可以看到,启动这个程序的时候,首先通过Spark Core调度系统,启动相关的类,预分配资源。也就说明Spark Streaming 运行之前分配好了最大可以利用的资源,你可以动态资源分配,而且你可以写多个线程去同时并发提交和执行Job。 
(3)Spark Streaming依赖于SparkCore,如果DStream不产生RDD怎么运行在Spark Core 上呢?(之所以用DStream封装RDD,是spark想做多元化,一体化的大数据处理平台,多元化的处理平台,他需要将API进行一体化,通过一个API干掉一切。假设DateSet这样一个统一接口,底层引擎一致,这样一个引擎交给所有计算范式使用,对于开发成本可以大大减小)。

接下来继续观察日志:

16/09/08 09:16:18 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.MappedDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.SocketInputDStream: Initialized and validated aaa@qq.com
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.FlatMappedDStream: Initialized and validated aaa@qq.com
16/09/08 09:16:18 INFO dstream.MappedDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.MappedDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.MappedDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.MappedDStream: Initialized and validated aaa@qq.com
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Checkpoint interval = null
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Remember duration = 120000 ms
16/09/08 09:16:18 INFO dstream.ShuffledDStream: Initialized and validated aaa@qq.com
16/09/08 09:16:18 INFO dstream.ForEachDStream: Slide time = 120000 ms
16/09/08 09:16:18 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/09/08 09:16:18 INFO dstream.ForEachDStream: Checkpoint interval = null

通过观察日志我们可以得出以下结论: 
真实产生DStream是从后往前回溯,真正执行Job时是从前往后执行。

接下来我们需要思考DStream为什么是RDD的模板?

**DStream从后往前回溯有个名字叫做DStreamGraph,他是RDD DAG的模板。 
此时我们需要求助于日志观察RDD,但是我们没有看到RDD内容,我们需要到Web控制台上去观察RDD。RDD是有时间的,有时间RDD才存在。**

[spark streaming]生成RDD并执行Spark Job源码内幕解密

[spark streaming]生成RDD并执行Spark Job源码内幕解密

不同的时间,说明时空不同。

object DStream {

  // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
  // `import StreamingContext._` to enable it. Now we move it here to make the compiler find
  // it automatically. However, we still keep the old function in StreamingContext for backward
  // compatibility and forward to the following function directly.

  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): 
    //此处展示DStream的隐式转换,生成PairDStreamFunctions
    PairDStreamFunctions[K, V] = {
    new PairDStreamFunctions[K, V](stream)
  }

我们现在回到问题DSteam是RDD的模板:

1、 刚才我们是现象说明,接下来我们用数据说明。 
2、现在为止我们看到了2个现象。(1)DStream从后往前依赖的Graph(2)、具体作业运行形成RDD DAG图且DAG图看上去像DStream的复印品,接下来我们有必要干一件事就是观察SocketTextStream,InputDStream怎样产生RDD,由于SocketInputDStream没有RDD的影子,所以我们到其父类观察,父类中有computer,有validTime,有时间返回的才是RDD,而且还是在特定时间下产生RDD。 
[spark streaming]生成RDD并执行Spark Job源码内幕解密

我们接下来看SocketInputStream的父类ReceiverInputDStream的computer的源码:

 /**
   * Generates RDDs with blocks received by the receiver of this stream. */

// see 返回的是RDD,而且还是在特定时间下产生RDD。

  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
      // see 此时为什么产生RDD,因为他有可能Driver级别失败重启,你的其他配置初始化还没有完成,但是定时器已经运行,运行时光不可逆转,到时间就要办事情,所以生成的是空RDD
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        //此处代码属于Driver 此处blockinfos来自于哪里?Driver中有个receiverTracker很正常,数据存储在Cluster集群中,Driver中有你的元数据,然后按照时间validTime,去BatchDuration来说明在这个BatchDuration收到了哪些数据,这些数据的元数据在Driver中被记录,我们现在的这些代码运行就在Driver中,所以我们根据元数据去确定我们实际有哪些数据,我们有数据指针,根据指针指向具体的数据,之所以我们拿指针,原因很简单,我们在调度。调度层面就是一个抽象层面,不是具体,具体层面是梦想照进现实的层面,Driver就是在做梦中,做梦要很清楚的知道如何映射现实,没有发生事情就是发生了事情,因为Driver控制了一切。
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

为了更好地说明DStream的形成,接下来我们看一下同样是输入数据的FileInputDStream的形成。

private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
    ssc_ : StreamingContext,
// see 首先找一个目录,,在眼下根据过去10秒,是否有新的文件进来
    directory: String,
    filter: Path => Boolean = FileInputDStream.defaultFilter,
    newFilesOnly: Boolean = true,
    conf: Option[Configuration] = None)
    (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
  extends InputDStream[(K, V)](ssc_) {

  private val serializableConfOpt = conf.map(new SerializableConfiguration(_))

接下来我们看一下,FileInputDStream的computer方法:

override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    // Find new files 在眼下产生多个新路径
    val newFiles = findNewFiles(validTime.milliseconds)
    logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    batchTimeToSelectedFiles += ((validTime, newFiles))
    recentlySelectedFiles ++= newFiles
    val rdds = Some(filesToRDD(newFiles))
    // Copy newFiles to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "files" -> newFiles.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    val inputInfo = StreamInputInfo(id, 0, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    // 最后返回RDD,所以是input级别的
    rdds
  }

我们看一下FileInputDStream的Files到RDD的方法:

/** Generate one RDD from an array of files */
  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
    val fileRDDs = files.map { file =>
      val rdd = serializableConfOpt.map(_.value) match {
        case Some(config) => context.sparkContext.newAPIHadoopFile(
          file,
          fm.runtimeClass.asInstanceOf[Class[F]],
          km.runtimeClass.asInstanceOf[Class[K]],
          vm.runtimeClass.asInstanceOf[Class[V]],
          config)
        case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
      }
      if (rdd.partitions.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
      rdd
    }
    // see 从InputDStream层面讲,刚才只需要一个RDD,此时很多RDD,Union合并成为一个RDD
    new UnionRDD(context.sparkContext, fileRDDs)
  }

接下来我们看一下DStream级别的操作:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration
//返回的是RDD
  override def compute(validTime: Time): Option[RDD[U]] = {
  //当然第一个InputDStream一定产生RDD,后面的RDD去依赖他,到父DStream中找RDD然后执行map,对上个DStream的操作实质上就是对产生的RDD的操作
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

这也进一步说明了,DStream就是SparkCore层面上更高级的抽象,对DStream的操作是虚的,对RDD的操作才是实的。

接下来,我们看一下特殊的action级别的RDD操作:

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
  //特定时间RDD操作
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
      //take是RDD的action操作,这个操作在运行到这里不会执行,这里只是定义了一个函数没有调用,sparkStream只是说要干,没有实际干,定义和执行分离开啦
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

接下来我们看一下foreachRDD生成的ForEachDStream

class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

// ForEachDStream中没有computer,他如果再有RDD还搞什么output操作,直接调用generatorJob,如何调用generatorJob,你得找定会器

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

接下来我们观察一下generateJobs的形成格式,首先genertor的形成需要timer:

 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
   //  ForEachDStream是outputStream
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }
 /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

      //foreachDStream注册给了graph
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

下面我们可以清楚地看到generateJob的全过程:

 /**
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

下面我们看一下ForEachDStream的getOrCompute方法

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD

    // 每一个DStream都有一个这样的成员generatedRDDs,有的话直接返回,没有的话就继续计算

    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.

      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

时间RDD,2分钟一个RDD,所有DStream每2分钟都有一个RDD。包括ForEachDStream内部也是RDD,因为DStream有这个成员结构,是这样。

 private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
  •  

到此为止我们可以清楚地考到DStream产生RDD的实例过程,DStream作为RDD模板的原理机制((1)DStream从后往前依赖的Graph(2)、具体作业运行形成RDD DAG图且DAG图看上去像DStream的)以及常见DStream生产RDD源码解密。

相关标签: spark