欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark Streaming任务延迟监控及告警

程序员文章站 2022-05-09 11:59:13
概述 StreamingListener 是针对spark streaming的各个阶段的事件监听机制。 StreamingListener接口 自定义StreamingListener 功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟 应用 订阅关注微信公众号《大数据技术进阶》,及时获 ......

概述

streaminglistener 是针对spark streaming的各个阶段的事件监听机制。

streaminglistener接口

//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可
//本身既有注释说明
trait streaminglistener {

  /** called when the streaming has been started */
  /** streaming 启动的事件 */
  def onstreamingstarted(streamingstarted: streaminglistenerstreamingstarted) { }

  /** called when a receiver has been started */
  /** 接收启动事件 */
  def onreceiverstarted(receiverstarted: streaminglistenerreceiverstarted) { }

  /** called when a receiver has reported an error */
  def onreceivererror(receivererror: streaminglistenerreceivererror) { }

  /** called when a receiver has been stopped */
  def onreceiverstopped(receiverstopped: streaminglistenerreceiverstopped) { }

  /** called when a batch of jobs has been submitted for processing. */
  /** 每个批次提交的事件 */
  def onbatchsubmitted(batchsubmitted: streaminglistenerbatchsubmitted) { }

  /** called when processing of a batch of jobs has started.  */
  /** 每个批次启动的事件 */
  def onbatchstarted(batchstarted: streaminglistenerbatchstarted) { }

  /** called when processing of a batch of jobs has completed. */
  /** 每个批次完成的事件  */
  def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted) { }

  /** called when processing of a job of a batch has started. */
  def onoutputoperationstarted(
      outputoperationstarted: streaminglisteneroutputoperationstarted) { }

  /** called when processing of a job of a batch has completed. */
  def onoutputoperationcompleted(
      outputoperationcompleted: streaminglisteneroutputoperationcompleted) { }
}

自定义streaminglistener

功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

class sparkstreamingdelaylistener(private val appname:string, private val duration: int,private val times: int) extends streaminglistener{

  private val logger = loggerfactory.getlogger("sparkstreamingdelaylistener")

//每个批次完成时执行
  override def onbatchcompleted(batchcompleted: streaminglistenerbatchcompleted): unit = {
    val batchinfo = batchcompleted.batchinfo
    val processingstarttime = batchcompleted.batchinfo.processingstarttime
    val numrecords = batchcompleted.batchinfo.numrecords
    val processingendtime = batchinfo.processingendtime
    val processingdelay = batchinfo.processingdelay
    val totaldelay = batchinfo.totaldelay

    //将每次告警时间写入redis,用以判断告警间隔大于2分钟
    val jedis = redisclusterclient.getjedisclusterclient()
    val current_time = (system.currenttimemillis / 1000).toint
    val redis_time = jedis.get(appname)
    var flag = false
    if(redis_time==null || current_time-redis_time.toint>120){
      jedis.set(appname,current_time.tostring)
      flag = true
    }
    
    //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警
    if(totaldelay.get >= times * duration * 1000 && flag){
      val monitorcontent = appname+": numrecords ->"+numrecords+",processingdelay ->"+processingdelay.get/1000+" s,totaldelay -> "+totaldelay.get/1000+"s"
      println(monitorcontent)
      val msg = "streaming_"+appname+"_delaytime:"+totaldelay.get/1000+"s"
      val geturl = "http://node1:8002/message/weixin?msg="+msg
      httpclient.doget(geturl)
    }
  }
}

应用

//streaminglistener不需要在配置中设置,可以直接添加到streamingcontext中
object my{
    def main(args : array[string]) : unit = {
        val sparkconf = new sparkconf()
        val ssc = new streamingcontext(sparkconf,seconds(20))
        ssc.addstreaminglistener(new sparkstreamingdelaylistener("userid2redis", duration,times))

        ....
    }
}

订阅关注微信公众号《大数据技术进阶》,及时获取更多大数据架构和应用相关技术文章!
Spark Streaming任务延迟监控及告警
Spark Streaming任务延迟监控及告警