欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark Streaming checkpoint 实现状态的恢复实现

程序员文章站 2024-03-15 12:36:59
...

StreamingContext中有个checkpoint方法,用来恢复state。
先说明下:
从数据角度讲下,Checkpoint是对于状态(state)操作生效。
首先,一般情况下在接收数据并保存时并不放在checkpoint里。
对状态(state)的DStream操作(updateStateByKey),操作会跨多个batch duration,后面数据对前面的有依赖,随着时间的推移,依赖链条会越来越长,这个时候需要使用checkpoint,把这个长链条持久化,成为短链条。

实现代码:

object PullSuppressClosePrice extends Logging{

  //main方法
  def main(args: Array[String]): Unit = {
    startJob()
  }

  def startJob(): Unit ={
    //获取app.conf配置文件信息
    val appConf = ConfigFactory.load("app.conf")
    //stream配置信息
    val streamConfInfo = appConf.getConfig("streaming")

    //初始化SparkContext
    val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))

    //初始化SparkStreamContext
    val ssc = StreamingContext.getOrCreate(streamConfInfo.getString("checkpointDir"),() => {
      streamCreatFun(sc,streamConfInfo.getInt("duration"),streamConfInfo.getString("checkpointDir"))
    })


    logInfo("指标程序启动...")
    ssc.start()
    logInfo("启动完成")

    ssc.awaitTermination()

    logInfo("指标程序关闭...")
    ssc.stop(true)
    logInfo("完成")
  }

  //从checkpoint恢复job上下文或者新建job上下文
  def streamCreatFun(sc : SparkContext, duration : Int, checkpointDir : String) = {
    val context =  new StreamingContext(sc, Seconds(duration))
    context.checkpoint(checkpointDir)
    //TODO    doYourJob() 这里写具体的业务处理
    context
  }

}

StreamingContext.getOrCreate方法 源码展示:
Spark Streaming checkpoint 实现状态的恢复实现

注释的意思大概是:
要么从checkpoint获取一个StreamingContext,要么创建一个新的StreamingContext。
新建StreamingContext是使用传递的[creatingFunc ] 创建的.
hadoopConf 可选参数 Hadoop的配置
createOnError 可选参数 默认情况下,错误会引发异常