Spark RPC框架源码分析(一)简述
一. spark rpc框架概述
spark是最近几年已经算是最为成功的大数据计算框架,那么这次我们就来介绍它内部的一个小点,spark rpc框架。
在介绍之前,我们需要先说明什么是rpc,引用百度百科:
rpc(remote procedure call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。rpc协议假定某些传输协议的存在,如tcp或udp,为通信程序之间携带信息数据。
spark rpc可以说是spark分布式集群的基础,若是将spark类比为一个人的话,spark rpc无疑就是它的血液部分。而在spark1.6之前,它的rpc部分还是用akka实现的,但之后底层就换成了netty来实现。为什么要这样做呢?因为啊,这样将spark和akka耦合在了一起,如果你系统本身就有使用到akka,然后又想使用spark的话,那两个akka框架版本不一致可怎么办呀,这无疑是很让人头痛的。spark团队正是考虑到了这一点,所以将akka替换成了netty。
这次我们就来看看spark是如何让它的血液流动起来的吧。有一位大神将spark rpc中的rpc部分剥离出来,弄成一个新的可运行的 rpc 项目,这个项目本身就可以当作一个简易的akka来使用,地址在这spark rpc。
虽然名字不一样,但这个项目的类和内容基本和spark core中rpc部分的代码和结构基本是一样的,这样我们就可以通过这个来学习spark rpc框架。
ps:所用spark版本:spark2.1.0
二.spark rpc中的 hello world
我们程序员学东西最喜欢从一个hello world开始,那么接下来我们就来演示如何下载并运行最简单的hello world例子吧。
首先,我使用的编译器是idea,通过idea将github上的代码clone下来。
可以看到项目目录下有两个模块,
- kraps-rpc
- kraps-rpc-example
kraps-rpc存放的是spark rpc的源代码,而我们要做的即是运行 kraps-rpc-example中的示例代码。
启动prc的话首先需要启动server端,开启监听服务,然后才能通过client进行访问。这里在helloworldserver.scala中都已经帮我们写好,不过在main方法中需要修改一下内容,就是将host改为本机地址。
def main(args: array[string]): unit = { // val host = args(0) val host = "localhost" val config = rpcenvserverconfig(new rpcconf(), "hello-server", host, 52345) val rpcenv: rpcenv = nettyrpcenvfactory.create(config) val helloendpoint: rpcendpoint = new helloendpoint(rpcenv) rpcenv.setupendpoint("hello-service", helloendpoint) rpcenv.awaittermination() }
然后我们只需要右键该文件然后执行即可。
接下来我们就需要启动client端代码,我们先到helloworldclient文件中,这里面提供了同步和异步两个方法可以运行。代码同样都已经写好,通过修改注释即可使用不同的方法运行。同样是右键点击该文件执行。
def main(args: array[string]): unit = { //异步方法 //asynccall() //同步方法 synccall() }
异步方法中,ask会返回一个future(注意这里的future是scala中的future,和java的是不一样的)。并且在future运行结果出来前,我们可以去做其他事情(异步的优势所在)。scala中的future和java的future有些不同,不过这可以先不去管,先当作java里面的future即可。
def asynccall() = { val rpcconf = new rpcconf() val config = rpcenvclientconfig(rpcconf, "hello-client") val rpcenv: rpcenv = nettyrpcenvfactory.create(config) val endpointref: rpcendpointref = rpcenv.setupendpointref(rpcaddress("localhost", 52345), "hello-service") val future: future[string] = endpointref.ask[string](sayhi("neo")) future.oncomplete { case scala.util.success(value) => println(s"got the result = $value") case scala.util.failure(e) => println(s"got error: $e") } await.result(future, duration.apply("3s")) //在future结果运行出来前,会先打印这条语句。 println("print me at first!") thread.sleep(7) }
而同步方法是直接将结果返回,并且会阻塞,这个时间内你无法做其他事情,只能等待,直到结果返回。
def synccall() = { val rpcconf = new rpcconf() val config = rpcenvclientconfig(rpcconf, "hello-client") val rpcenv: rpcenv = nettyrpcenvfactory.create(config) val endpointref: rpcendpointref = rpcenv.setupendpointref(rpcaddress("localhost", 52345), "hello-service") val result = endpointref.askwithretry[string](saybye("neo")) println(result) }
很简单是吧,运行过例子后,我们就可以来了解一些spark rpc运行过程中至关重要的两个编程模型,以及在这其中使用到的一些主要的类。
三.spark rpc中的两个编程模型以及各个类
spark rpc是使用了actor模型和reactor模型的混合模式,我们结合两种模型分别说明spark rpc中各个类的作用:
首先我们先来看spark rpc的类图。
是不是感觉很乱?没事,我们来逐步剖析各个类。
为了更加清楚了说明各个类的关系,我们要先知道两个模型,分别是actor模型和reactor模型,我们将从这两个模型的角度来拆解各个类的关系。
actor模型
其实之前也有写过一篇介绍actor模型的文章,感兴趣的同学可以点击这里查看actor模型浅析。
其实actor主要就是这副图的内容:
在spark rpc中有几个类分别与actor模型中的各个角色对应,对应如下,左边的是spark rpc中的类,右边的是actor模型中的角色:
rpcendpoint => actor
rpcendpointref => actorref
rpcenv => actorsystem
我们逐个来看:
rpcenv --rpc environment
rpc environment 是 rpcendpoint 的运行环境。它管理 rpcendpoint 的整个生命周期:
- 通过名字或 uri 注册 rpcendpoint。
- 对到底的消息进行路由,决定分发给哪个 rpcendpoint。
- 停止 rpcendpoint。
rpc environment在akka已经被移除的2.0后面版本中,rpc environment的实现类是nettyrpcenv。通常是由nettyrpcenvfactory.create创建。
rpcendpoint
rpcendpoint能通过callbacks接收消息。通常需要我们自己写一个类继承rpcendpoint。编写自己的接收信息和返回信息规则。
rpcendpoint的生命周期被rpc environment管理。其生命周期包括,onstart,receive和onstop。
它是作为服务端,比如上面例子中的helloworldserver就是一个rpcendpoint。
rpcendpointref
rpcendpointref是rpcendpoint在rpc environment中的一个引用。
它包含一个地址(即spark url)和名字。rpcendpointref作为客户端向服务端发送请求并接收返回信息,通常可以选择使用同步或异步的方式进行发送。
reactor模型
spark rpc采用actor模型和reactor模型混合的结构,上面已经介绍了actor,那么现在我们就来介绍reactor模型,同样,我们可以从一张图来看reactor的架构。
使用reactor模型,由底层netty创建的eventloop做i/o多路复用,这里使用multiple reactors这种形式,如上图所示,从netty的角度而言,main reactor和sub reactor对应bossgroup和workergroup的概念,前者负责监听tcp连接、建立和断开,后者负责真正的i/o读写。
而图中的threadpool就是的dispatcher中的线程池,它来解耦开来耗时的业务逻辑和i/o操作,这样就可以更scalabe,只需要少数的线程就可以处理成千上万的连接,这种思想是标准的分治策略,offload非i/o操作到另外的线程池。
dispatcher
dispatcher的主要作用是保存注册的rpcendpoint、分发相应的message到rpcendpoint中进行处理。dispatcher即是上图中threadpool的角色。它同时也维系一个threadpool,用来处理每次接受到的inboxmessage。而这里处理inboxmessage是通过inbox实现的。
inbox
inbox其实属于actor模型,是actor中的信箱,不过它和dispatcher联系紧密所以放这边。
inboxmessage有多个实现它的类,比如onewaymessage,rpcmessage,等等。dispatcher会将接收到的inboxmessage分发到对应rpcendpoint的inbox中,然后inbox便会处理这个inboxmessage。
ok,这次就先介绍到这里,下次我们从代码的角度来看spark rpc的运行机制
如果觉得对你有帮助,不妨关注一波吧~~
参考资料:
推荐阅读:
从分治算法到 mapreduce
actor并发编程模型浅析
大数据存储的进化史 --从 raid 到 hadoop hdfs
上一篇: 在大街上无聊的晃荡着
下一篇: 快过年了