Spark Streaming任务延迟监控及告警
程序员文章站
2022-05-09 11:59:13
概述 StreamingListener 是针对spark streaming的各个阶段的事件监听机制。 StreamingListener接口 自定义StreamingListener 功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟 应用 订阅关注微信公众号《大数据技术进阶》,及时获 ......
概述
streaminglistener 是针对spark streaming的各个阶段的事件监听机制。
streaminglistener接口
//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可 //本身既有注释说明 trait streaminglistener { /** called when the streaming has been started */ /** streaming 启动的事件 */ def onstreamingstarted(streamingstarted: streaminglistenerstreamingstarted) { } /** called when a receiver has been started */ /** 接收启动事件 */ def onreceiverstarted(receiverstarted: streaminglistenerreceiverstarted) { } /** called when a receiver has reported an error */ def onreceivererror(receivererror: streaminglistenerreceivererror) { } /** called when a receiver has been stopped */ def onreceiverstopped(receiverstopped: streaminglistenerreceiverstopped) { } /** called when a batch of jobs has been submitted for processing. */ /** 每个批次提交的事件 */ def onbatchsubmitted(batchsubmitted: streaminglistenerbatchsubmitted) { } /** called when processing of a batch of jobs has started. */ /** 每个批次启动的事件 */ def onbatchstarted(batchstarted: streaminglistenerbatchstarted) { } /** called when processing of a batch of jobs has completed. */ /** 每个批次完成的事件 */ def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted) { } /** called when processing of a job of a batch has started. */ def onoutputoperationstarted( outputoperationstarted: streaminglisteneroutputoperationstarted) { } /** called when processing of a job of a batch has completed. */ def onoutputoperationcompleted( outputoperationcompleted: streaminglisteneroutputoperationcompleted) { } }
自定义streaminglistener
功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟
class sparkstreamingdelaylistener(private val appname:string, private val duration: int,private val times: int) extends streaminglistener{ private val logger = loggerfactory.getlogger("sparkstreamingdelaylistener") //每个批次完成时执行 override def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted): unit = { val batchinfo = batchcompleted.batchinfo val processingstarttime = batchcompleted.batchinfo.processingstarttime val numrecords = batchcompleted.batchinfo.numrecords val processingendtime = batchinfo.processingendtime val processingdelay = batchinfo.processingdelay val totaldelay = batchinfo.totaldelay //将每次告警时间写入redis,用以判断告警间隔大于2分钟 val jedis = redisclusterclient.getjedisclusterclient() val current_time = (system.currenttimemillis / 1000).toint val redis_time = jedis.get(appname) var flag = false if(redis_time==null || current_time-redis_time.toint>120){ jedis.set(appname,current_time.tostring) flag = true } //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警 if(totaldelay.get >= times * duration * 1000 && flag){ val monitorcontent = appname+": numrecords ->"+numrecords+",processingdelay ->"+processingdelay.get/1000+" s,totaldelay -> "+totaldelay.get/1000+"s" println(monitorcontent) val msg = "streaming_"+appname+"_delaytime:"+totaldelay.get/1000+"s" val geturl = "http://node1:8002/message/weixin?msg="+msg httpclient.doget(geturl) } } }
应用
//streaminglistener不需要在配置中设置,可以直接添加到streamingcontext中 object my{ def main(args : array[string]) : unit = { val sparkconf = new sparkconf() val ssc = new streamingcontext(sparkconf,seconds(20)) ssc.addstreaminglistener(new sparkstreamingdelaylistener("userid2redis", duration,times)) .... } }
订阅关注微信公众号《大数据技术进阶》,及时获取更多大数据架构和应用相关技术文章!