Flink的Job启动Driver端(源码分析)
整个flink的job启动是通过在driver端通过用户的envirement的execute()方法将用户的算子转化成streamgraph
然后得到jobgraph通过远程rpc将这个jobgraph提交到jobmanager对应的接口
jobmanager转化成executiongraph.deploy(),然后生成tdd发给taskmanager,然后整个job就启动起来了
这里来看一下driver端的实现从用户的envirement.execute()方法作为入口
这里的envirement分为
remotestreamenvironment
localstreamenvironment
因为local模式比较简单这里就不展开了,主要是看下remotestreamenvironment的execute方法
可以看到这里先获取到了streamgraph,具体获取的实现
这里传入了一个transformations其中就包含了我们用户的所有operator
这个地方就是遍历了用户端所有的operator生成streamgraph,遍历的每一个算子具体转化成streamgraph的逻辑
1处会递归遍历input直到input已经transfor,然后拿到了上游的ids
然后将operator加入到了streamgraph中调用addnode()方法将operator作为一个node,包含了一些信息,上下游的类型,并行度,soltgroup
最后遍历上游的ids,创建边添加到streamgraph
到这里streamgraph就创建完成了
回到最开始的地方,创建完streamgraph以后,会将streamgraph传入executeremotely(streamgraph, jarfiles)这个方法,这里就是streamgraph转化成jobgraph的逻辑
其中创建了一个restclusterclient
可以看到这里,通过getjobgraph方法将streamgraph转换成了jobgraph
然后就submitjob将这个jobgraph提交jobmanager了
先看一下streamgraph如何转化成jobgraph的
通过getjobgraph方法然后
这个createjobgraph方法是主要的转化逻辑
广度优先遍历为所有streamgraph的node 即operator生成hash散列值,为什么要生成这个operator的hash?
因为这个hash需要作为每一个operator的唯一标示,标示每一个operator用于cp的恢复,当用户代码没有修改时,这个hash值是不会改变的
接下来
这里会将flink中上下游的operator操作根据是否满足chain条件链在一起,在createchian中
这个ischainable()方法就是是否可以chain的判断条件
1.下游的输入边只有一条
2.下游操作operator不为空
3.上游操作operator不为空
4.上游必须有相同的solt组
5.下游chain策略为always
6.上游chain策略为head或上游chain策略为always
7.forwardpartition的边
8.上下游并行度相同
9.用户代码设置的operator是否可以chian
将可以chain的streamnode 链在一起以后就可以创建成为jobgraph的jobvertex了
然后通过restclusterclient会将这个jobgraph往jobmanager的dispatcher对应的rpc接口上面发送
整个job的启动driver端的任务就结束了
总结:
在driver端用户的算子会被创建成为streamgraph,其中包含了一些边,角,上下游类型,并行度等一些信息
然后将streamgraph通过一些chain条件将可以chain的顶点chain在了一起转化成了jobgraph
streamedge变成了jobedge,chain在一起的streamnode变成了jobvertex
最后然后通过rpc将整个jobgraph向jobmanager提交。
上一篇: Python—包管理工具与上传工具
下一篇: Mysql使用SSL连接