欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink的Job启动JobManager端(源码分析)

程序员文章站 2022-05-03 22:37:33
通过前面的文章了解到 Driver将用户代码转换成streamGraph再转换成Jobgraph后向Jobmanager端提交 JobManager启动以后会在Dispatcher.java起来RPC方法submitJob(jobGraph),用于接收来自Driver转化得到的JobGraph来启动 ......

通过前面的文章了解到

driver将用户代码转换成streamgraph再转换成jobgraph后向jobmanager端提交

jobmanager启动以后会在dispatcher.java起来rpc方法submitjob(jobgraph),用于接收来自driver转化得到的jobgraph来启动任务

具体来看jobgraph提交到jobmanager的submitjob方法

Flink的Job启动JobManager端(源码分析)

Flink的Job启动JobManager端(源码分析)

Flink的Job启动JobManager端(源码分析)

Flink的Job启动JobManager端(源码分析)

前面都是一些调用链没有什么好讲的,最后到createjobmanager( )方法这里

Flink的Job启动JobManager端(源码分析)

先看一下1,创建了一个jobmanagerrunner并且将中driver端得到的jobgraph传递了进去

 在创建jobmanagerrunner的过程中它调用了

Flink的Job启动JobManager端(源码分析)

这里主要是为了创建一个jobmaster,在jobmaster的构造方法中

Flink的Job启动JobManager端(源码分析)

在这里它先是create传入了jobgraph然后又通过createandrestoreexecutiongraph()方法转换得到executiongraph

这个executiongraph就可以用来调度启动任务了

具体看一下他的转化逻辑

Flink的Job启动JobManager端(源码分析)

可以看到它从createexecutiongraph方法中得到了executiongraph

并且通过getcheckpointcoordinator()方法得到了一个coordinator(主要是用于周期性触发checkpoint,调用对应taskmanager的rpc生成barriers往下游发送)

继续看一下他的转化逻辑

在createexecutiongraph中通过executiongraphbuilder.buildgraph()返回了一个executiongraph

在buildgraph()方法中

Flink的Job启动JobManager端(源码分析)

创建了一个executiongraph

 Flink的Job启动JobManager端(源码分析)

为executiongraph设置一些基础信息,包括调度方式等(这里stream是eager的调度方法)

然后

Flink的Job启动JobManager端(源码分析)

1处得到了一个的拓扑图包含了所有jobgraph的所有jobvertex节点

2处就是具体遍历所有jobgraph的jobvertex生成executiongraph的顶点executionjobvertex

Flink的Job启动JobManager端(源码分析)

遍历所有jobgraph的顶点jobvertex

Flink的Job启动JobManager端(源码分析)

在这里就具体生成了executionjobvertex中的每一个executionvertex[] taskvertices

当然这里还会配置很多executiongraph的信息,就不一一列举了

配置了一些executiongraph的属性以后

调用了

Flink的Job启动JobManager端(源码分析)

可以看到我的注释,就是说这个地方其实是和coordinator的创建有关,在这个方法中

Flink的Job启动JobManager端(源码分析)

创建了一个coordinator对象

Flink的Job启动JobManager端(源码分析)

在这里注册了一个jobstatus的监听

来看一下这个监听的作用

Flink的Job启动JobManager端(源码分析)

可以看到源码上的注解就是说用于监听job状态的改变,具体监听

看到这里就非常明显了

当监听到jobstutes的状态改变时

Flink的Job启动JobManager端(源码分析)

当jobstatus变成running时调用了coordinator的.startcheckpointscheduler()方法其中

Flink的Job启动JobManager端(源码分析)

这里可以看到创建了一个周期的调度线程

看下线程的run方法

Flink的Job启动JobManager端(源码分析)

这里就真相大白了,调用了triggercheckpoint方法触发一次checkpoint(触发checkpoint的逻辑以后随缘更新到再讲)

注意,前面说到只是注册了一个监听,也就是说这个coordinator现在其实还没有启动起来的!!要到监听到jobstatus变成running才会启动

回到最开始的这里

Flink的Job启动JobManager端(源码分析)

1处转化成executiongraph以后

2处具体看一下这个startjobmanagerrunner()方法

Flink的Job启动JobManager端(源码分析)

把jobmanager启动了起来

Flink的Job启动JobManager端(源码分析)

Flink的Job启动JobManager端(源码分析)

Flink的Job启动JobManager端(源码分析)

 Flink的Job启动JobManager端(源码分析)

在其中

Flink的Job启动JobManager端(源码分析)

启动了这个jobmasterservice

Flink的Job启动JobManager端(源码分析)

在这里开启了jobmaster的一些rpc,像什么cancel job的stop job 的还有register tm的

然后startjobexecution()方法中

Flink的Job启动JobManager端(源码分析)

这里其实会向jobmanager中启动的resourcemanager的rpc请求solt信息初始化自己的的soltpool这里不细讲了,我还没有研究

后面

Flink的Job启动JobManager端(源码分析)

这个地方就是修改job状态和调度运行了

其中调用了scheduleexecutiongraph(),在其中又调用了

Flink的Job启动JobManager端(源码分析)

这个地方比较重要,在其中先

Flink的Job启动JobManager端(源码分析)

这里它就通过cas修改了jobstatue从created变成了running

修改完了以后还没完,还通过这个方法notifyjobstatuschange(),这个方法里面具体看一看

Flink的Job启动JobManager端(源码分析)

他遍历了所有的listener,也就是说会触发我们前面注册的那个coordinator的监听监听到job状态改变为running

这里coordinator就启动完成了

继续往下,在修改完job状态以后

Flink的Job启动JobManager端(源码分析)

因为流模式这里是用的eager,flink批处理我不熟这里就不展开了

在这个schduleeager方法中

Flink的Job启动JobManager端(源码分析)

然后

Flink的Job启动JobManager端(源码分析)

看到这里它创建了一个taskdeploymentdescriptor一个用于调度taskmanager端任务的tdd对象

看过前面几篇博客的同学,就应该有印象了,在taskmanager启动会启动很多的rpc接口

其中有一个

Flink的Job启动JobManager端(源码分析)

一目了然了,这个东西是用来发送给taskmanager用于启动taskmanager端任务的!!!!

到这里jobmanager端的job启动任务就差不多完成了

接下来就是taskmanager端的任务了,随缘更新的时候在说一下真正taskmanager节点是如何启动我们job任务的