欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark2.1.0——内置Web框架详解

程序员文章站 2022-04-06 12:31:55
任何系统都需要提供监控功能,否则在运行期间发生一些异常时,我们将会束手无策。也许有人说,可以增加日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现Bug,以及提供对业务有帮助的调试信息。当你的JVM进程奔溃或者程序响应速度很慢时,这些日志将毫无用处。好在JVM提供了jstat、js... ......

spark2.1.0——内置web框架详解

  任何系统都需要提供监控功能,否则在运行期间发生一些异常时,我们将会束手无策。也许有人说,可以增加日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现bug,以及提供对业务有帮助的调试信息。当你的jvm进程奔溃或者程序响应速度很慢时,这些日志将毫无用处。好在jvm提供了jstat、jstack、jinfo、jmap、jhat等工具帮助我们分析,更有visualvm的可视化界面以更加直观的方式对jvm运行期的状况进行监控。此外,像tomcat、hadoop等服务都提供了基于web的监控页面,用浏览器能访问具有样式及布局,并提供丰富监控数据的页面无疑是一种简单、高效的方式。

  spark自然也提供了web页面来浏览监控数据,而且master、worker、driver根据自身功能提供了不同内容的web监控页面。无论是master、worker,还是driver,它们都使用了统一的web框架webui。master、worker及driver分别使用masterwebui、workerwebui及sparkui提供的web界面服务,后三者都继承自webui,并增加了个性化的功能。此外,在yarn或mesos模式下还有webui的另一个扩展实现historyserver。historyserver将会展现已经运行完成的应用程序信息。本章以sparkui为例,并深入分析webui的框架体系。

sparkui概述

  在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如spark ui采用scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到driver所在jvm的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑进而被快速释放。线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。spark ui就是这样的服务,它的构成如图1所示。

Spark2.1.0——内置Web框架详解

图1       sparkui的组成

图1展示了sparkui中的各个组件,这里对这些组件作简单介绍:

  • sparklistenerevent事件的来源:包括dagscheduler、sparkcontext、driverendpoint、blockmanagermasterendpoint以及localschedulerbackend等,这些组件将会产生各种sparklistenerevent,并发送到listenerbus的事件队列中。driverendpoint是driver在standalone或local-cluster模式下与其他组件进行通信的组件,在《spark内核设计的艺术》一书的第9.9.2节有详细介绍。blockmanagermasterendpoint是driver对分配给应用的所有executor及其blockmanager进行统一管理的组件,在《spark内核设计的艺术》一书的6.8节详细介绍。localschedulerbackend是local模式下的调度后端接口,用于给任务分配资源或对任务的状态进行更新,在《spark内核设计的艺术》一书的7.8.2节详细介绍。
  • 事件总线listenerbus。根据3.3节对事件总线的介绍,我们知道listenerbus通过定时器将sparklistenerevent事件匹配到具体的sparklistener,进而改变各个sparklistener中的统计监控数据。
  • spark ui的界面。各个sparklistener内的统计监控数据将会被各种标签页和具体页面展示到web界面。标签页有stagestab、jobstab、executorstab、environmenttab以及storagetab。每个标签页中包含若干个页面,例如stagestab标签页中包含了allstagespage、stagepage及poolpage三个页面。
  • 控制台的展示。细心的读者会发现图1中还有sparkstatustracker(spark状态跟踪器)和consoleprogressbar(控制台进度条)两个组件。sparkstatustracker负责对job和stage的监控,其实际也是使用了jobprogresslistener中的监控数据,并额外进行了一些加工。consoleprogressbar负责将sparkstatustracker提供的数据打印到控制台上。从最终展现的角度来看,sparkstatustracker和consoleprogressbar不应该属于sparkui的组成部分,但是由于其实现与jobprogresslistener密切相关,所以将它们也放在了sparkui的内容中。

webui框架体系

  spark ui构建在webui的框架体系之上,因此应当首先了解webui。webui定义了一种web界面展现的框架,并提供返回json格式数据的web服务。webui用于展示一组标签页,webuitab定义了标签页的规范。每个标签页中包含着一组页面,webuipage定义了页面的规范。我们将首先了解webuipage和webuitab,最后从整体来看webui。

webuipage的定义

  任何的web界面往往由多个页面组成,每个页面都将提供不同的内容展示。webuipage是webui框架体系的页节点,定义了所有页面应当遵循的规范。抽象类webuipage的定义见代码清单1。

代码清单1  webuipage的定义

private[spark] abstract class webuipage(var prefix: string) {
  def render(request: httpservletrequest): seq[node]
  def renderjson(request: httpservletrequest): jvalue = jnothing
} 

webuipage定义了两个方法。

  • render:渲染页面;
  • renderjson:生成json。

webuipage在webui框架体系中的上一级节点(也可以称为父亲)可以是webui或者webuitab,其成员属性prefix将与上级节点的路径一起构成当前webuipage的访问路径。

webuitab的定义

         有时候web界面需要将多个页面作为一组内容放置在一起,这时候标签页是常见的展现形式。标签页webuitab定义了所有标签页的规范,并用于展现一组webuipage。抽象类webuitab的定义见代码清单2。

代码清单2  webuitab的定义

private[spark] abstract class webuitab(parent: webui, val prefix: string) {
  val pages = arraybuffer[webuipage]()
  val name = prefix.capitalize

  def attachpage(page: webuipage) {
    page.prefix = (prefix + "/" + page.prefix).stripsuffix("/")
    pages += page
  }

  def headertabs: seq[webuitab] = parent.gettabs

  def basepath: string = parent.getbasepath
}

根据代码清单2,可以看到webuitab有四个成员属性:

  • parent:上一级节点,即父亲。webuitab的父亲只能是webui。
  • prefix:当前webuitab的前缀。prefix将与上级节点的路径一起构成当前webuitab的访问路径。
  • pages:当前webuitab所包含的webuipage的缓冲数组。
  • name:当前webuitab的名称。name实际是对prefix的首字母转换成大写字母后取得。

此外,webuitab还有三个成员方法,下面介绍它们的作用:

  • attachpage:首先将当前webuitab的前缀与webuipage的前缀拼接,作为webuipage的访问路径。然后向pages中添加webuipage。
  • headertabs:获取父亲webui中的所有webuitab。此方法实际通过调用父亲webui的gettabs方法实现,gettabs方法请参阅下一小节——webui的定义。
  • basepath:获取父亲webui的基本路径。此方法实际通过调用父亲webui的getbasepath方法实现,getbasepath方法请参阅下一小节——webui的定义。。

webui的定义

  webui是spark实现的用于提供web界面展现的框架,凡是需要页面展现的地方都可以继承它来完成。webui定义了webui框架体系的规范。为便于理解,首先明确webui中各个成员属性的含义:

  • securitymanager:sparkenv中创建的安全管理器securitymanager,5.2节对securitymanager有详细介绍。
  • ssloptions:使用securitymanager获取spark.ssl.ui属性指定的webui的ssl(secure sockets layer 安全套接层)选项。
  • port:webui对外服务的端口。可以使用spark.ui.port属性进行配置。
  • conf:即sparkconf。
  • basepath:webui的基本路径。basepath默认为空字符串。
  • name:webui的名称。spark ui的name为sparkui。
  • tabs:webuitab的缓冲数组。
  • handlers:servletcontexthandler的缓冲数组。servletcontexthandler是jetty提供的api,负责对servletcontext进行处理。servletcontexthandler的使用及jetty的更多内容可以参阅附录c。
  • pagetohandlers:webuipage与servletcontexthandler缓冲数组之间的映射关系。由于webuipage的两个方法render和renderjson分别需要由一个对应的servletcontexthandler处理。所以一个webuipage对应两个servletcontexthandler。
  • serverinfo:用于缓存serverinfo,即webui的jetty服务器信息。
  • publichostname:当前webui的jetty服务的主机名。优先采用系统环境变量spark_public_dns指定的主机名,否则采用spark.driver.host属性指定的host,在没有前两个配置的时候,将默认使用工具类utils的localhostname方法(详见附录a)返回的主机名。
  • classname:过滤了$符号的当前类的简单名称。classname 是通过utils的getformattedclassname方法得到的。getformattedclassname方法的实现请看附录a。

了解了webui的成员属性,现在就可以理解其提供的各个方法了。webui提供的方法有:

  • getbasepath:获取basepath。
  • gettabs:获取tabs中的所有webuitab,并以scala的序列返回。
  • gethandlers:获取handlers中的所有servletcontexthandler,并以scala的序列返回。
  • getsecuritymanager:获取securitymanager。
  • attachhandler:给handlers缓存数组中添加servletcontexthandler,并且将此servletcontexthandler通过serverinfo的addhandler方法添加到jetty服务器中。attachhandler的实现见代码清单3。serverinfo的addhandler方法的请参阅附录c。

代码清单3  attachhandler的实现

  def attachhandler(handler: servletcontexthandler) {
    handlers += handler
    serverinfo.foreach(_.addhandler(handler))
  }
  • detachhandler:从handlers缓存数组中移除servletcontexthandler,并且将此servletcontexthandler通过serverinfo的removehandler方法从jetty服务器中移除。detachhandler的实现见代码清单4。serverinfo的removehandler方法的请参阅附录c。

代码清单4  detachhandler的实现

  def detachhandler(handler: servletcontexthandler) {
    handlers -= handler
    serverinfo.foreach(_.removehandler(handler))
  }
  • attachpage:首先调用工具类jettyutils[1]的createservlethandler方法给webuipage创建与render和renderjson两个方法分别关联的servletcontexthandler,然后通过attachhandler方法添加到handlers缓存数组与jetty服务器中,最后把webuipage与这两个servletcontexthandler的映射关系更新到pagetohandlers中。attachpage的实现见代码清单5。

代码清单5  attachpage的实现

  def attachpage(page: webuipage) {
    val pagepath = "/" + page.prefix
    val renderhandler = createservlethandler(pagepath,
      (request: httpservletrequest) => page.render(request), securitymanager, conf, basepath)
    val renderjsonhandler = createservlethandler(pagepath.stripsuffix("/") + "/json",
      (request: httpservletrequest) => page.renderjson(request), securitymanager, conf, basepath)
    attachhandler(renderhandler)
    attachhandler(renderjsonhandler)
    val handlers = pagetohandlers.getorelseupdate(page, arraybuffer[servletcontexthandler]())
    handlers += renderhandler
  }
  • detachpage:作用与attachpage相反。detachpage的实现见代码清单6。

代码清单6  detachpage的实现

  def detachpage(page: webuipage) {
    pagetohandlers.remove(page).foreach(_.foreach(detachhandler))
  }
  • attachtab:首先向tabs中添加webuitab,然后给webuitab中的每个webuipage施加attachpage方法。attachtab的实现见代码清单7。

代码清单7  attachtab的实现

  def attachtab(tab: webuitab) {
    tab.pages.foreach(attachpage)
    tabs += tab
  }
  • detachtab:作用与attachtab相反。detachtab的实现见代码清单8。

代码清单8  detachtab的实现

  def detachtab(tab: webuitab) {
    tab.pages.foreach(detachpage)
    tabs -= tab
  }
  • addstatichandler:首先调用工具类jettyutils的createstatichandler方法创建静态文件服务的servletcontexthandler,然后施加attachhandler方法。addstatichandler的实现见代码清单9。jettyutils的createstatichandler方法的实现见附录c。

代码清单9     addstatichandler的实现

  def addstatichandler(resourcebase: string, path: string): unit = {
    attachhandler(jettyutils.createstatichandler(resourcebase, path))
  }
  • removestatichandler:作用与addstatichandler相反。removestatichandler的实现见代码清单10。

代码清单10         removestatichandler的实现

  def removestatichandler(path: string): unit = {
    handlers.find(_.getcontextpath() == path).foreach(detachhandler)
  }
  • initialize:用于初始化webui服务中的所有组件。webui中此方法未实现,需要子类实现。
  • bind:启动与webui绑定的jetty服务。bind方法的实现见代码清单11。

代码清单11         bind的实现

  def bind() {
    assert(!serverinfo.isdefined, s"attempted to bind $classname more than once!")
    try {
      val host = option(conf.getenv("spark_local_ip")).getorelse("0.0.0.0")
      serverinfo = some(startjettyserver(host, port, ssloptions, handlers, conf, name))
      loginfo(s"bound $classname to $host, and started at $weburl")
    } catch {
      case e: exception =>
        logerror(s"failed to bind $classname", e)
        system.exit(1)
    }
  }
  • weburl:获取webui的web界面的url。weburl的实现如下:
  def weburl: string = shttp://$publichostname:$boundport
  • boundport:获取webui的jetty服务的端口。boundport的实现如下:
  def boundport: int = serverinfo.map(_.boundport).getorelse(-1)
  • stop:停止webui。实际是停止webui底层的jetty服务。stop方法的实现见代码清单12。

代码清单12         stop方法的实现

  def stop() {
    assert(serverinfo.isdefined,
      s"attempted to stop $classname before binding to a server!")
    serverinfo.get.stop()
  }

 创建sparkui

  在sparkcontext的初始化过程中,会创建sparkui。有了对webui的总体认识,现在是时候了解sparkcontext是如何构造sparkui的了。sparkui是webui框架的使用范例,了解了sparkui的创建过程,读者对masterwebui、workerwebui及historyserver的创建过程也必然了然于心。创建sparkui的代码如下:

    _statustracker = new sparkstatustracker(this)

    _progressbar =
      if (_conf.getboolean("spark.ui.showconsoleprogress", true) && !log.isinfoenabled) {
        some(new consoleprogressbar(this))
      } else {
        none
      }

    _ui =
      if (conf.getboolean("spark.ui.enabled", true)) {
        some(sparkui.createliveui(this, _conf, listenerbus, _jobprogresslistener,
          _env.securitymanager, appname, starttime = starttime))
      } else {
        // for tests, do not enable the ui
        none
      }
    _ui.foreach(_.bind())

这段代码的执行步骤如下。

1)  创建spark状态跟踪器sparkstatustracker。

2)  创建consoleprogressbar。可以配置spark.ui.showconsoleprogress属性为false取消对consoleprogressbar的创建,此属性默认为true。

3)  调用sparkui的createliveui方法创建sparkui。

4)  给sparkui绑定端口。sparkui继承自webui,因此调用了代码清单4-12中webui的bind方法启动sparkui底层的jetty服务。

         上述步骤中,第1)、2)、4)步都很简单,所以着重来分析第3)步。sparkui的createliveui的实现如下。

  def createliveui(
      sc: sparkcontext,
      conf: sparkconf,
      listenerbus: sparklistenerbus,
      jobprogresslistener: jobprogresslistener,
      securitymanager: securitymanager,
      appname: string,
      starttime: long): sparkui = {
    create(some(sc), conf, listenerbus, securitymanager, appname,
      jobprogresslistener = some(jobprogresslistener), starttime = starttime)
  }

可以看到sparkui的createliveui方法中调用了create方法。create的实现如下。

  private def create(
      sc: option[sparkcontext],
      conf: sparkconf,
      listenerbus: sparklistenerbus,
      securitymanager: securitymanager,
      appname: string,
      basepath: string = "",
      jobprogresslistener: option[jobprogresslistener] = none,
      starttime: long): sparkui = {

    val _jobprogresslistener: jobprogresslistener = jobprogresslistener.getorelse {
      val listener = new jobprogresslistener(conf)
      listenerbus.addlistener(listener)
      listener
    }

    val environmentlistener = new environmentlistener
    val storagestatuslistener = new storagestatuslistener(conf)
    val executorslistener = new executorslistener(storagestatuslistener, conf)
    val storagelistener = new storagelistener(storagestatuslistener)
    val operationgraphlistener = new rddoperationgraphlistener(conf)

    listenerbus.addlistener(environmentlistener)
    listenerbus.addlistener(storagestatuslistener)
    listenerbus.addlistener(executorslistener)
    listenerbus.addlistener(storagelistener)
    listenerbus.addlistener(operationgraphlistener)

    new sparkui(sc, conf, securitymanager, environmentlistener, storagestatuslistener,
      executorslistener, _jobprogresslistener, storagelistener, operationgraphlistener,
      appname, basepath, starttime)
  }

可以看到create方法里除了jobprogresslistener是外部传入的之外,又增加了一些sparklistener,例如用于对jvm参数、spark属性、java系统属性、classpath等进行监控的environmentlistener;用于维护executor的存储状态的storagestatuslistener;用于准备将executor的信息展示在executorstab的executorslistener;用于准备将executor相关存储信息展示在blockmanagerui的storagelistener;用于构建rdd的dag(有向无关图)的rddoperationgraphlistener等。这5个sparklistener的实现添加到listenerbus的监听器列表中。最后使用sparkui的构造器创建sparkui。

sparkui的初始化

  调用sparkui的构造器创建sparkui,实际也是对sparkui的初始化过程。在介绍初始化之前,先来看看sparkui中的两个成员属性。

  • killenabled:标记当前sparkui能否提供杀死stage或者job的链接。
  • appid:当前应用的id。

sparkui的构造过程中会执行initialize方法,其实现见代码清单13。

代码清单13         sparkui的初始化

  def initialize() {
    val jobstab = new jobstab(this)
    attachtab(jobstab)
    val stagestab = new stagestab(this)
    attachtab(stagestab)
    attachtab(new storagetab(this))
    attachtab(new environmenttab(this))
    attachtab(new executorstab(this))
    attachhandler(createstatichandler(sparkui.static_resource_dir, "/static"))
    attachhandler(createredirecthandler("/", "/jobs/", basepath = basepath))
    attachhandler(apirootresource.getservlethandler(this))
    // these should be post only, but, the yarn am proxy won't proxy posts
    attachhandler(createredirecthandler(
      "/jobs/job/kill", "/jobs/", jobstab.handlekillrequest, httpmethods = set("get", "post")))
    attachhandler(createredirecthandler(
      "/stages/stage/kill", "/stages/", stagestab.handlekillrequest,
      httpmethods = set("get", "post")))
  }
  initialize()

根据代码清单13,sparkui的初始化步骤如下。

1)  构建页面布局并给每个webuitab中的所有webuipage创建对应的servletcontexthandler。这一步使用了代码清单4-8中展示的attachtab方法。

2)  调用jettyutils的createstatichandler方法创建对静态目录org/apache/spark/ui/static提供文件服务的servletcontexthandler,并使用attachhandler方法追加到sparkui的服务中。

3)  调用jettyutils的createredirecthandler方法创建几个将用户对源路径的请求重定向到目标路径的servletcontexthandler。例如,将用户对根路径"/"的请求重定向到目标路径"/jobs/"的servletcontexthandler。

sparkui的页面布局与展示

  sparkui究竟是如何实现页面布局及展示的? 由于所有标签页都继承了sparkuitab,所以我们先来看看sparkuitab的实现:

private[spark] abstract class sparkuitab(parent: sparkui, prefix: string)
  extends webuitab(parent, prefix) {
  def appname: string = parent.getappname
}

根据上述代码,我们知道sparkuitab继承了webuitab,并在实现中增加了一个用于获取当前应用名称的方法appname。environmenttab是用于展示jvm、spark属性、系统属性、类路径等相关信息的标签页,由于其实现简单且能说明问题,所以本节挑选environmenttab作为示例解答本节一开始提出的问题。

         environmenttab的实现见代码清单14。

代码清单14         environmenttab的实现

private[ui] class environmenttab(parent: sparkui) extends sparkuitab(parent, "environment") {
  val listener = parent.environmentlistener
  attachpage(new environmentpage(this))
}

根据代码清单14,我们知道environmenttab引用了sparkui的environmentlistener(类型为environmentlistener),并且包含environmentpage这个页面。environmenttab通过调用attachpage方法将environmentpage与jetty服务关联起来。根据代码清单5中attachpage的实现,创建的renderhandler将采用偏函数(request: httpservletrequest) => page.render(request) 处理请求,因而会调用environmentpage的render方法。environmentpage的render方法将会渲染页面元素。environmentpage的实现见代码清单15。

代码清单15         environmentpage的实现

private[ui] class environmentpage(parent: environmenttab) extends webuipage("") {
  private val listener = parent.listener

  private def removepass(kv: (string, string)): (string, string) = {
    if (kv._1.tolowercase.contains("password") || kv._1.tolowercase.contains("secret")) {
      (kv._1, "******")
    } else kv
  }

  def render(request: httpservletrequest): seq[node] = {
   // 调用uiutils的listingtable方法生成jvm运行时信息、spark属性信息、系统属性信息、类路径信息的表格 
   val runtimeinformationtable = uiutils.listingtable(
      propertyheader, jvmrow, listener.jvminformation, fixedwidth = true)
    val sparkpropertiestable = uiutils.listingtable(
      propertyheader, propertyrow, listener.sparkproperties.map(removepass), fixedwidth = true)
    val systempropertiestable = uiutils.listingtable(
      propertyheader, propertyrow, listener.systemproperties, fixedwidth = true)
    val classpathentriestable = uiutils.listingtable(
      classpathheaders, classpathrow, listener.classpathentries, fixedwidth = true)
    val content =
      <span>
        <h4>runtime information</h4> {runtimeinformationtable}
        <h4>spark properties</h4> {sparkpropertiestable}
        <h4>system properties</h4> {systempropertiestable}
        <h4>classpath entries</h4> {classpathentriestable}
      </span>
    // 调用uiutils的headersparkpage方法封装好css、js、header及页面布局等
    uiutils.headersparkpage("environment", content, parent)
  }
  // 定义jvm运行时信息、spark属性信息、系统属性信息的表格头部propertyheader和类路径信息的表格头部   
  // classpathheaders
  private def propertyheader = seq("name", "value")
  private def classpathheaders = seq("resource", "source")
  // 定义jvm运行时信息的表格中每行数据的生成方法jvmrow
  private def jvmrow(kv: (string, string)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
  private def propertyrow(kv: (string, string)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
  private def classpathrow(data: (string, string)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}

根据代码清单15,environmentpage的render方法利用从父节点environmenttab中得到的environmentlistener中的统计监控数据生成jvm运行时、spark属性、系统属性以及类路径等状态的摘要信息。以jvm运行时为例,页面渲染的步骤如下:

1)  定义jvm运行时信息、spark属性信息、系统属性信息的表格头部propertyheader和类路径信息的表格头部classpathheaders。

2)  定义jvm运行时信息的表格中每行数据的生成方法jvmrow。

3)  调用uiutils的listingtable方法生成jvm运行时信息、spark属性信息、系统属性信息、类路径信息的表格。

4)  调用uiutils的headersparkpage方法封装好css、js、header及页面布局等。

uiutils工具类的实现细节留给感兴趣的读者自行查阅,本文不多赘述。


[1]本节内容用到jettyutils中的很多方法,读者可以在附录c中找到相应的实现与说明。

关于《spark内核设计的艺术 架构设计与实现》

经过近一年的准备,基于spark2.1.0版本的《spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

Spark2.1.0——内置Web框架详解

 

纸质版售卖链接如下:

京东: