Flink中TaskManager端执行用户逻辑过程(源码分析)
taskmanager接收到来自jobmanager的jobgraph转换得到的tdd对象,启动了任务,在streaminputprocessor类的processinput()方法中
通过一个while(true)中不停的拉取上游的数据,然后调用streamoperator.processelement(record)调用用户实现的方法去处理数据拉取的数据
首先先来看下这个operator对象
然后看看oneinputstreamoperator类的uml
这里所有的实现类没有全部列出,只列了一些代表
看到这里,写过flink的streamapi的同学,肯定感觉到很熟悉!!!!!!
这里!不就是我们常写flink代码的那些算子嘛
对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个oneinputstreamoperator,这里具体看一个最熟悉的fliter
来看一下streamfilter的processelement方法
!!!这里传入一个数据后,这个userfunction调用了filter方法并且把数据放进去了
当返回true通过这个output.collect发送出去了
这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userfunction包含了用户实现filter算子的逻辑
(!!!!!就是说这个processelement方法会调用用户的逻辑)
(所以这个userfunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientapi添加新功能方法,运行时可以通过这里拿到)
继续
来看看这个output.collect()方法
然后
看到这个,等等等等
我不是从这个processelement()方法进来的吗,怎么又开始调processelement()方法了
难道递归了? 不对不对
这里operator不是上一个operator了,而是这个output对象的(这里是chainoutput)
看下这个output对象
看下uml类图,也是只列举了重要的
先看chainingoutput的属性
发现了又出现了oneinputstreamoperator对象
看到这个实现类的名字!chain联想起了什么
flink会将可以chain在一起的算子在streamgraph转换成jobgraph的时候根据条件chain在一起
一惊!
来分别看一下chainingoutput和recordwriteroutput的collect()方法有什么区别
在chain中
在recordwriter中
这里chain的ouput,又继续调用了下一个operator的processelement方法,然后又在processelement方法中又调用output.collect( ),collect中又调用了下一个operator的processelement方法
整个过程就是个无限的循环,直到,某一个operator的ouput不为chainingoutput,当变为recordwriteroutput时
上面看到recordwriteroutput的processelement直接emit发送出去了这个数据,再也没有继续调用processelement方法了
这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了
来看一下uml类图帮助理解
里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)
那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢
回到taskmanager接收到jobmanager的tdd以后初始化整个任务的时候
streamtask.java中invoke方法中
先是初始化了一个operatorchain,里面其实就是一个数组streamoperator
在他初始化的时候,其实就是为我们所有的streamoutputs设置了他的output以及会根据jobmanager发送过来的tdd(包含信息)
设置成对应的chainingoutput还是recordwriteroutput,chainoutput会设置他的的operator
然后获取了getheadoperator()其实就是获取了他调用连中的第一个
然后在
将这个第一个operator关联到了inputprocessor对象里面
后面就简单了在inputprocessor.processinput中就进入了while(true)循环拉取上游数据的逻辑
然后
在这里调用的第一个processelement方法就是我们的那个headoperator
这样整个调用责任链就开始从第一个operator运行起来了