欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark源码系列1--------spark作业提交源码流程分析

程序员文章站 2022-07-12 17:13:39
...

java  SparkSubmit -xxx -xxx -xxx
提交语句会在client上开启一个提交的进程
此处只是spark在yarn cluster模式下主要的提交流程框架,主要过程如下:
1、通过启动SparkSubmit进程,内部反射运行Client类的main方法;
2、client主要是根yarn集群的rm进行交互,主要是向rm传送启动am进程的启动命令以及参数,在yarn的nm节点上启动ApplicationMaster;
3、ApplicationMaster进程启动之后反射启动用户提交的spark程序,也就是driver,注意这里是通过单独开一个线程来反射启动的;
ApplicationMaster还有一个重要的事情就是跟yarn的rm进行通信交互,向yarn主次并且获取资源,之后分配资源,分配资源考虑节点数据以及算法的优化,
之后通过线程池向分配的container发送启动executorBackend的进程启动command来启动CoarseGrainedExecutorBackend进程;
4、CoarseGrainedExecutorBackend进程main方法做的事情主要是跟driver通信,向driver注册之后进行一系列通信,启动executor并且根据通信状态执行具体的task;
5、task分配涉及到后续的driver的任务分解,stage确定,以及分区task分配的工作了。
 

JVM  -Process (SparkSubmit)  main方法
1 SparkSubmit
    //启动进程
    --main
        //封装进程启动的xxx参数
        --new SparkSubmitArguments
        //提交
        --submit
            //准备提交环境(返回的是自己spark程序的mainClass以及args等信息)
            --prepareSubmitEnvironment
                //cluster提交模式
                --childMainClass = "org.apache.spark.deploy.yarn.Client"
                //client提交模式
                --childMainClass = args.mainClass(自己打包spark程序的主类)
            //参数就是prepareSubmitEnvironment返回的结果
            --doRunMain (runMain)
                //反射加载类
                --Utils.classForName(childMainClass)
                //查找main方法
                --mainClass.getMethod("main",new Array[String](0).getClass)
                //调用main方法
                --mainMethod.invoke

                
2 Client
    --main
        --new ClientArguments(argsString)
        --new Client
            //连接yarn的客户端,内部参数是yarn集群resourceManager的address等信息
            --yarnClient = YarnClient.createYarnClient
        --client.run()
            //产生applicationId
            --submitApplication
                //封装指令 command = /bin/java org.apache.spark.deploy.yarn.ApplicationMaster
                --createContainerLaunchContext
                --createApplicationSubmissionContext
                //向yarn提交应用
                --yarnClient.submitApplication(appcontext)
                
            
3 ApplicationMaster(yarnNM中启动)
    //启动进程
    --main
        --new ApplicationMasterArguments(args)
        //创建应用管理器对象
        --new ApplicationMaster
        --master.run()
            //Cluster
            --runDriver
                //启动用户应用
                --startUserApplication
                    //获取用户类的main方法
                    --userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
                    //启动线程,执行用户类的main方法,线程的名字就是driver!!!
                    --new Thread().start()
                //注册AM
                --registerAM
                    //获取yarn资源
                    --client.register
                    //分配资源
                    --allocator.allocateResources()
                        //考虑数据文件节点的本地化
                        --handleAllocatedContainers
                            //启动cachedThreadPool运行
                            --runAllocatedContainers
                                --new ExecutorRunnable().run()
                                    //rpc跟nm发送启动进程的指令, command = /bin/java  org.apache.spark.executor.CoarseGrainedExecutorBackend
                                    --startContainer
                                    
4 CoarseGrainedExecutorBackend
    --main
        --run
            --onStart
                //ref为CoarseGrainedExecutorBackend类中的Driver的rpc通信的句柄
                --ref.ask[Boolean](RegisterExecutor)
                
            --receive
                //rpc通信模式匹配executor后台的操作
                --case RegisteredExecutor
                    --new Executor
                --case launchTask
                    --executor.launchTask

 

相关标签: 分布式计算