Spark Streaming checkpoint 实现状态的恢复实现
程序员文章站
2024-03-15 12:36:59
...
StreamingContext中有个checkpoint方法,用来恢复state。
先说明下:
从数据角度讲下,Checkpoint是对于状态(state)操作生效。
首先,一般情况下在接收数据并保存时并不放在checkpoint里。
对状态(state)的DStream操作(updateStateByKey),操作会跨多个batch duration,后面数据对前面的有依赖,随着时间的推移,依赖链条会越来越长,这个时候需要使用checkpoint,把这个长链条持久化,成为短链条。
实现代码:
object PullSuppressClosePrice extends Logging{
//main方法
def main(args: Array[String]): Unit = {
startJob()
}
def startJob(): Unit ={
//获取app.conf配置文件信息
val appConf = ConfigFactory.load("app.conf")
//stream配置信息
val streamConfInfo = appConf.getConfig("streaming")
//初始化SparkContext
val sc = new SparkContext(new SparkConf().setAppName(appConf.getString("name")))
//初始化SparkStreamContext
val ssc = StreamingContext.getOrCreate(streamConfInfo.getString("checkpointDir"),() => {
streamCreatFun(sc,streamConfInfo.getInt("duration"),streamConfInfo.getString("checkpointDir"))
})
logInfo("指标程序启动...")
ssc.start()
logInfo("启动完成")
ssc.awaitTermination()
logInfo("指标程序关闭...")
ssc.stop(true)
logInfo("完成")
}
//从checkpoint恢复job上下文或者新建job上下文
def streamCreatFun(sc : SparkContext, duration : Int, checkpointDir : String) = {
val context = new StreamingContext(sc, Seconds(duration))
context.checkpoint(checkpointDir)
//TODO doYourJob() 这里写具体的业务处理
context
}
}
StreamingContext.getOrCreate方法 源码展示:
注释的意思大概是:
要么从checkpoint获取一个StreamingContext,要么创建一个新的StreamingContext。
新建StreamingContext是使用传递的[creatingFunc
] 创建的.
hadoopConf 可选参数 Hadoop的配置
createOnError 可选参数 默认情况下,错误会引发异常
上一篇: 利用异或数据集演示欠拟合问题
下一篇: php对多维数组进行排序