Spark2.1.0——内置Web框架详解
spark2.1.0——内置web框架详解
任何系统都需要提供监控功能,否则在运行期间发生一些异常时,我们将会束手无策。也许有人说,可以增加日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现bug,以及提供对业务有帮助的调试信息。当你的jvm进程奔溃或者程序响应速度很慢时,这些日志将毫无用处。好在jvm提供了jstat、jstack、jinfo、jmap、jhat等工具帮助我们分析,更有visualvm的可视化界面以更加直观的方式对jvm运行期的状况进行监控。此外,像tomcat、hadoop等服务都提供了基于web的监控页面,用浏览器能访问具有样式及布局,并提供丰富监控数据的页面无疑是一种简单、高效的方式。
spark自然也提供了web页面来浏览监控数据,而且master、worker、driver根据自身功能提供了不同内容的web监控页面。无论是master、worker,还是driver,它们都使用了统一的web框架webui。master、worker及driver分别使用masterwebui、workerwebui及sparkui提供的web界面服务,后三者都继承自webui,并增加了个性化的功能。此外,在yarn或mesos模式下还有webui的另一个扩展实现historyserver。historyserver将会展现已经运行完成的应用程序信息。本章以sparkui为例,并深入分析webui的框架体系。
sparkui概述
在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如spark ui采用scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到driver所在jvm的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑进而被快速释放。线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。spark ui就是这样的服务,它的构成如图1所示。
图1 sparkui的组成
图1展示了sparkui中的各个组件,这里对这些组件作简单介绍:
- sparklistenerevent事件的来源:包括dagscheduler、sparkcontext、driverendpoint、blockmanagermasterendpoint以及localschedulerbackend等,这些组件将会产生各种sparklistenerevent,并发送到listenerbus的事件队列中。driverendpoint是driver在standalone或local-cluster模式下与其他组件进行通信的组件,在《spark内核设计的艺术》一书的第9.9.2节有详细介绍。blockmanagermasterendpoint是driver对分配给应用的所有executor及其blockmanager进行统一管理的组件,在《spark内核设计的艺术》一书的6.8节详细介绍。localschedulerbackend是local模式下的调度后端接口,用于给任务分配资源或对任务的状态进行更新,在《spark内核设计的艺术》一书的7.8.2节详细介绍。
- 事件总线listenerbus。根据3.3节对事件总线的介绍,我们知道listenerbus通过定时器将sparklistenerevent事件匹配到具体的sparklistener,进而改变各个sparklistener中的统计监控数据。
- spark ui的界面。各个sparklistener内的统计监控数据将会被各种标签页和具体页面展示到web界面。标签页有stagestab、jobstab、executorstab、environmenttab以及storagetab。每个标签页中包含若干个页面,例如stagestab标签页中包含了allstagespage、stagepage及poolpage三个页面。
- 控制台的展示。细心的读者会发现图1中还有sparkstatustracker(spark状态跟踪器)和consoleprogressbar(控制台进度条)两个组件。sparkstatustracker负责对job和stage的监控,其实际也是使用了jobprogresslistener中的监控数据,并额外进行了一些加工。consoleprogressbar负责将sparkstatustracker提供的数据打印到控制台上。从最终展现的角度来看,sparkstatustracker和consoleprogressbar不应该属于sparkui的组成部分,但是由于其实现与jobprogresslistener密切相关,所以将它们也放在了sparkui的内容中。
webui框架体系
spark ui构建在webui的框架体系之上,因此应当首先了解webui。webui定义了一种web界面展现的框架,并提供返回json格式数据的web服务。webui用于展示一组标签页,webuitab定义了标签页的规范。每个标签页中包含着一组页面,webuipage定义了页面的规范。我们将首先了解webuipage和webuitab,最后从整体来看webui。
webuipage的定义
任何的web界面往往由多个页面组成,每个页面都将提供不同的内容展示。webuipage是webui框架体系的页节点,定义了所有页面应当遵循的规范。抽象类webuipage的定义见代码清单1。
代码清单1 webuipage的定义
private[spark] abstract class webuipage(var prefix: string) {
def render(request: httpservletrequest): seq[node]
def renderjson(request: httpservletrequest): jvalue = jnothing
}
webuipage定义了两个方法。
- render:渲染页面;
- renderjson:生成json。
webuipage在webui框架体系中的上一级节点(也可以称为父亲)可以是webui或者webuitab,其成员属性prefix将与上级节点的路径一起构成当前webuipage的访问路径。
webuitab的定义
有时候web界面需要将多个页面作为一组内容放置在一起,这时候标签页是常见的展现形式。标签页webuitab定义了所有标签页的规范,并用于展现一组webuipage。抽象类webuitab的定义见代码清单2。
代码清单2 webuitab的定义
private[spark] abstract class webuitab(parent: webui, val prefix: string) {
val pages = arraybuffer[webuipage]()
val name = prefix.capitalize
def attachpage(page: webuipage) {
page.prefix = (prefix + "/" + page.prefix).stripsuffix("/")
pages += page
}
def headertabs: seq[webuitab] = parent.gettabs
def basepath: string = parent.getbasepath
}
根据代码清单2,可以看到webuitab有四个成员属性:
- parent:上一级节点,即父亲。webuitab的父亲只能是webui。
- prefix:当前webuitab的前缀。prefix将与上级节点的路径一起构成当前webuitab的访问路径。
- pages:当前webuitab所包含的webuipage的缓冲数组。
- name:当前webuitab的名称。name实际是对prefix的首字母转换成大写字母后取得。
此外,webuitab还有三个成员方法,下面介绍它们的作用:
- attachpage:首先将当前webuitab的前缀与webuipage的前缀拼接,作为webuipage的访问路径。然后向pages中添加webuipage。
- headertabs:获取父亲webui中的所有webuitab。此方法实际通过调用父亲webui的gettabs方法实现,gettabs方法请参阅下一小节——webui的定义。
- basepath:获取父亲webui的基本路径。此方法实际通过调用父亲webui的getbasepath方法实现,getbasepath方法请参阅下一小节——webui的定义。。
webui的定义
webui是spark实现的用于提供web界面展现的框架,凡是需要页面展现的地方都可以继承它来完成。webui定义了webui框架体系的规范。为便于理解,首先明确webui中各个成员属性的含义:
- securitymanager:sparkenv中创建的安全管理器securitymanager,5.2节对securitymanager有详细介绍。
- ssloptions:使用securitymanager获取spark.ssl.ui属性指定的webui的ssl(secure sockets layer 安全套接层)选项。
- port:webui对外服务的端口。可以使用spark.ui.port属性进行配置。
- conf:即sparkconf。
- basepath:webui的基本路径。basepath默认为空字符串。
- name:webui的名称。spark ui的name为sparkui。
- tabs:webuitab的缓冲数组。
- handlers:servletcontexthandler的缓冲数组。servletcontexthandler是jetty提供的api,负责对servletcontext进行处理。servletcontexthandler的使用及jetty的更多内容可以参阅附录c。
- pagetohandlers:webuipage与servletcontexthandler缓冲数组之间的映射关系。由于webuipage的两个方法render和renderjson分别需要由一个对应的servletcontexthandler处理。所以一个webuipage对应两个servletcontexthandler。
- serverinfo:用于缓存serverinfo,即webui的jetty服务器信息。
- publichostname:当前webui的jetty服务的主机名。优先采用系统环境变量spark_public_dns指定的主机名,否则采用spark.driver.host属性指定的host,在没有前两个配置的时候,将默认使用工具类utils的localhostname方法(详见附录a)返回的主机名。
- classname:过滤了$符号的当前类的简单名称。classname 是通过utils的getformattedclassname方法得到的。getformattedclassname方法的实现请看附录a。
了解了webui的成员属性,现在就可以理解其提供的各个方法了。webui提供的方法有:
- getbasepath:获取basepath。
- gettabs:获取tabs中的所有webuitab,并以scala的序列返回。
- gethandlers:获取handlers中的所有servletcontexthandler,并以scala的序列返回。
- getsecuritymanager:获取securitymanager。
- attachhandler:给handlers缓存数组中添加servletcontexthandler,并且将此servletcontexthandler通过serverinfo的addhandler方法添加到jetty服务器中。attachhandler的实现见代码清单3。serverinfo的addhandler方法的请参阅附录c。
代码清单3 attachhandler的实现
def attachhandler(handler: servletcontexthandler) {
handlers += handler
serverinfo.foreach(_.addhandler(handler))
}
- detachhandler:从handlers缓存数组中移除servletcontexthandler,并且将此servletcontexthandler通过serverinfo的removehandler方法从jetty服务器中移除。detachhandler的实现见代码清单4。serverinfo的removehandler方法的请参阅附录c。
代码清单4 detachhandler的实现
def detachhandler(handler: servletcontexthandler) {
handlers -= handler
serverinfo.foreach(_.removehandler(handler))
}
- attachpage:首先调用工具类jettyutils[1]的createservlethandler方法给webuipage创建与render和renderjson两个方法分别关联的servletcontexthandler,然后通过attachhandler方法添加到handlers缓存数组与jetty服务器中,最后把webuipage与这两个servletcontexthandler的映射关系更新到pagetohandlers中。attachpage的实现见代码清单5。
代码清单5 attachpage的实现
def attachpage(page: webuipage) {
val pagepath = "/" + page.prefix
val renderhandler = createservlethandler(pagepath,
(request: httpservletrequest) => page.render(request), securitymanager, conf, basepath)
val renderjsonhandler = createservlethandler(pagepath.stripsuffix("/") + "/json",
(request: httpservletrequest) => page.renderjson(request), securitymanager, conf, basepath)
attachhandler(renderhandler)
attachhandler(renderjsonhandler)
val handlers = pagetohandlers.getorelseupdate(page, arraybuffer[servletcontexthandler]())
handlers += renderhandler
}
- detachpage:作用与attachpage相反。detachpage的实现见代码清单6。
代码清单6 detachpage的实现
def detachpage(page: webuipage) {
pagetohandlers.remove(page).foreach(_.foreach(detachhandler))
}
- attachtab:首先向tabs中添加webuitab,然后给webuitab中的每个webuipage施加attachpage方法。attachtab的实现见代码清单7。
代码清单7 attachtab的实现
def attachtab(tab: webuitab) {
tab.pages.foreach(attachpage)
tabs += tab
}
- detachtab:作用与attachtab相反。detachtab的实现见代码清单8。
代码清单8 detachtab的实现
def detachtab(tab: webuitab) {
tab.pages.foreach(detachpage)
tabs -= tab
}
- addstatichandler:首先调用工具类jettyutils的createstatichandler方法创建静态文件服务的servletcontexthandler,然后施加attachhandler方法。addstatichandler的实现见代码清单9。jettyutils的createstatichandler方法的实现见附录c。
代码清单9 addstatichandler的实现
def addstatichandler(resourcebase: string, path: string): unit = {
attachhandler(jettyutils.createstatichandler(resourcebase, path))
}
- removestatichandler:作用与addstatichandler相反。removestatichandler的实现见代码清单10。
代码清单10 removestatichandler的实现
def removestatichandler(path: string): unit = {
handlers.find(_.getcontextpath() == path).foreach(detachhandler)
}
- initialize:用于初始化webui服务中的所有组件。webui中此方法未实现,需要子类实现。
- bind:启动与webui绑定的jetty服务。bind方法的实现见代码清单11。
代码清单11 bind的实现
def bind() {
assert(!serverinfo.isdefined, s"attempted to bind $classname more than once!")
try {
val host = option(conf.getenv("spark_local_ip")).getorelse("0.0.0.0")
serverinfo = some(startjettyserver(host, port, ssloptions, handlers, conf, name))
loginfo(s"bound $classname to $host, and started at $weburl")
} catch {
case e: exception =>
logerror(s"failed to bind $classname", e)
system.exit(1)
}
}
- weburl:获取webui的web界面的url。weburl的实现如下:
def weburl: string = shttp://$publichostname:$boundport
- boundport:获取webui的jetty服务的端口。boundport的实现如下:
def boundport: int = serverinfo.map(_.boundport).getorelse(-1)
- stop:停止webui。实际是停止webui底层的jetty服务。stop方法的实现见代码清单12。
代码清单12 stop方法的实现
def stop() {
assert(serverinfo.isdefined,
s"attempted to stop $classname before binding to a server!")
serverinfo.get.stop()
}
创建sparkui
在sparkcontext的初始化过程中,会创建sparkui。有了对webui的总体认识,现在是时候了解sparkcontext是如何构造sparkui的了。sparkui是webui框架的使用范例,了解了sparkui的创建过程,读者对masterwebui、workerwebui及historyserver的创建过程也必然了然于心。创建sparkui的代码如下:
_statustracker = new sparkstatustracker(this)
_progressbar =
if (_conf.getboolean("spark.ui.showconsoleprogress", true) && !log.isinfoenabled) {
some(new consoleprogressbar(this))
} else {
none
}
_ui =
if (conf.getboolean("spark.ui.enabled", true)) {
some(sparkui.createliveui(this, _conf, listenerbus, _jobprogresslistener,
_env.securitymanager, appname, starttime = starttime))
} else {
// for tests, do not enable the ui
none
}
_ui.foreach(_.bind())
这段代码的执行步骤如下。
1) 创建spark状态跟踪器sparkstatustracker。
2) 创建consoleprogressbar。可以配置spark.ui.showconsoleprogress属性为false取消对consoleprogressbar的创建,此属性默认为true。
3) 调用sparkui的createliveui方法创建sparkui。
4) 给sparkui绑定端口。sparkui继承自webui,因此调用了代码清单4-12中webui的bind方法启动sparkui底层的jetty服务。
上述步骤中,第1)、2)、4)步都很简单,所以着重来分析第3)步。sparkui的createliveui的实现如下。
def createliveui(
sc: sparkcontext,
conf: sparkconf,
listenerbus: sparklistenerbus,
jobprogresslistener: jobprogresslistener,
securitymanager: securitymanager,
appname: string,
starttime: long): sparkui = {
create(some(sc), conf, listenerbus, securitymanager, appname,
jobprogresslistener = some(jobprogresslistener), starttime = starttime)
}
可以看到sparkui的createliveui方法中调用了create方法。create的实现如下。
private def create(
sc: option[sparkcontext],
conf: sparkconf,
listenerbus: sparklistenerbus,
securitymanager: securitymanager,
appname: string,
basepath: string = "",
jobprogresslistener: option[jobprogresslistener] = none,
starttime: long): sparkui = {
val _jobprogresslistener: jobprogresslistener = jobprogresslistener.getorelse {
val listener = new jobprogresslistener(conf)
listenerbus.addlistener(listener)
listener
}
val environmentlistener = new environmentlistener
val storagestatuslistener = new storagestatuslistener(conf)
val executorslistener = new executorslistener(storagestatuslistener, conf)
val storagelistener = new storagelistener(storagestatuslistener)
val operationgraphlistener = new rddoperationgraphlistener(conf)
listenerbus.addlistener(environmentlistener)
listenerbus.addlistener(storagestatuslistener)
listenerbus.addlistener(executorslistener)
listenerbus.addlistener(storagelistener)
listenerbus.addlistener(operationgraphlistener)
new sparkui(sc, conf, securitymanager, environmentlistener, storagestatuslistener,
executorslistener, _jobprogresslistener, storagelistener, operationgraphlistener,
appname, basepath, starttime)
}
可以看到create方法里除了jobprogresslistener是外部传入的之外,又增加了一些sparklistener,例如用于对jvm参数、spark属性、java系统属性、classpath等进行监控的environmentlistener;用于维护executor的存储状态的storagestatuslistener;用于准备将executor的信息展示在executorstab的executorslistener;用于准备将executor相关存储信息展示在blockmanagerui的storagelistener;用于构建rdd的dag(有向无关图)的rddoperationgraphlistener等。这5个sparklistener的实现添加到listenerbus的监听器列表中。最后使用sparkui的构造器创建sparkui。
sparkui的初始化
调用sparkui的构造器创建sparkui,实际也是对sparkui的初始化过程。在介绍初始化之前,先来看看sparkui中的两个成员属性。
- killenabled:标记当前sparkui能否提供杀死stage或者job的链接。
- appid:当前应用的id。
sparkui的构造过程中会执行initialize方法,其实现见代码清单13。
代码清单13 sparkui的初始化
def initialize() {
val jobstab = new jobstab(this)
attachtab(jobstab)
val stagestab = new stagestab(this)
attachtab(stagestab)
attachtab(new storagetab(this))
attachtab(new environmenttab(this))
attachtab(new executorstab(this))
attachhandler(createstatichandler(sparkui.static_resource_dir, "/static"))
attachhandler(createredirecthandler("/", "/jobs/", basepath = basepath))
attachhandler(apirootresource.getservlethandler(this))
// these should be post only, but, the yarn am proxy won't proxy posts
attachhandler(createredirecthandler(
"/jobs/job/kill", "/jobs/", jobstab.handlekillrequest, httpmethods = set("get", "post")))
attachhandler(createredirecthandler(
"/stages/stage/kill", "/stages/", stagestab.handlekillrequest,
httpmethods = set("get", "post")))
}
initialize()
根据代码清单13,sparkui的初始化步骤如下。
1) 构建页面布局并给每个webuitab中的所有webuipage创建对应的servletcontexthandler。这一步使用了代码清单4-8中展示的attachtab方法。
2) 调用jettyutils的createstatichandler方法创建对静态目录org/apache/spark/ui/static提供文件服务的servletcontexthandler,并使用attachhandler方法追加到sparkui的服务中。
3) 调用jettyutils的createredirecthandler方法创建几个将用户对源路径的请求重定向到目标路径的servletcontexthandler。例如,将用户对根路径"/"的请求重定向到目标路径"/jobs/"的servletcontexthandler。
sparkui的页面布局与展示
sparkui究竟是如何实现页面布局及展示的? 由于所有标签页都继承了sparkuitab,所以我们先来看看sparkuitab的实现:
private[spark] abstract class sparkuitab(parent: sparkui, prefix: string)
extends webuitab(parent, prefix) {
def appname: string = parent.getappname
}
根据上述代码,我们知道sparkuitab继承了webuitab,并在实现中增加了一个用于获取当前应用名称的方法appname。environmenttab是用于展示jvm、spark属性、系统属性、类路径等相关信息的标签页,由于其实现简单且能说明问题,所以本节挑选environmenttab作为示例解答本节一开始提出的问题。
environmenttab的实现见代码清单14。
代码清单14 environmenttab的实现
private[ui] class environmenttab(parent: sparkui) extends sparkuitab(parent, "environment") {
val listener = parent.environmentlistener
attachpage(new environmentpage(this))
}
根据代码清单14,我们知道environmenttab引用了sparkui的environmentlistener(类型为environmentlistener),并且包含environmentpage这个页面。environmenttab通过调用attachpage方法将environmentpage与jetty服务关联起来。根据代码清单5中attachpage的实现,创建的renderhandler将采用偏函数(request: httpservletrequest) => page.render(request) 处理请求,因而会调用environmentpage的render方法。environmentpage的render方法将会渲染页面元素。environmentpage的实现见代码清单15。
代码清单15 environmentpage的实现
private[ui] class environmentpage(parent: environmenttab) extends webuipage("") {
private val listener = parent.listener
private def removepass(kv: (string, string)): (string, string) = {
if (kv._1.tolowercase.contains("password") || kv._1.tolowercase.contains("secret")) {
(kv._1, "******")
} else kv
}
def render(request: httpservletrequest): seq[node] = {
// 调用uiutils的listingtable方法生成jvm运行时信息、spark属性信息、系统属性信息、类路径信息的表格
val runtimeinformationtable = uiutils.listingtable(
propertyheader, jvmrow, listener.jvminformation, fixedwidth = true)
val sparkpropertiestable = uiutils.listingtable(
propertyheader, propertyrow, listener.sparkproperties.map(removepass), fixedwidth = true)
val systempropertiestable = uiutils.listingtable(
propertyheader, propertyrow, listener.systemproperties, fixedwidth = true)
val classpathentriestable = uiutils.listingtable(
classpathheaders, classpathrow, listener.classpathentries, fixedwidth = true)
val content =
<span>
<h4>runtime information</h4> {runtimeinformationtable}
<h4>spark properties</h4> {sparkpropertiestable}
<h4>system properties</h4> {systempropertiestable}
<h4>classpath entries</h4> {classpathentriestable}
</span>
// 调用uiutils的headersparkpage方法封装好css、js、header及页面布局等
uiutils.headersparkpage("environment", content, parent)
}
// 定义jvm运行时信息、spark属性信息、系统属性信息的表格头部propertyheader和类路径信息的表格头部
// classpathheaders
private def propertyheader = seq("name", "value")
private def classpathheaders = seq("resource", "source")
// 定义jvm运行时信息的表格中每行数据的生成方法jvmrow
private def jvmrow(kv: (string, string)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyrow(kv: (string, string)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classpathrow(data: (string, string)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}
根据代码清单15,environmentpage的render方法利用从父节点environmenttab中得到的environmentlistener中的统计监控数据生成jvm运行时、spark属性、系统属性以及类路径等状态的摘要信息。以jvm运行时为例,页面渲染的步骤如下:
1) 定义jvm运行时信息、spark属性信息、系统属性信息的表格头部propertyheader和类路径信息的表格头部classpathheaders。
2) 定义jvm运行时信息的表格中每行数据的生成方法jvmrow。
3) 调用uiutils的listingtable方法生成jvm运行时信息、spark属性信息、系统属性信息、类路径信息的表格。
4) 调用uiutils的headersparkpage方法封装好css、js、header及页面布局等。
uiutils工具类的实现细节留给感兴趣的读者自行查阅,本文不多赘述。
[1]本节内容用到jettyutils中的很多方法,读者可以在附录c中找到相应的实现与说明。
关于《spark内核设计的艺术 架构设计与实现》
经过近一年的准备,基于spark2.1.0版本的《spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖链接如下:
京东:
上一篇: 006.Ceph对象存储基础使用
下一篇: sql 嵌套查询