欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink中TaskManager端执行用户逻辑过程(源码分析)

程序员文章站 2022-07-04 23:37:08
TaskManager接收到来自JobManager的jobGraph转换得到的TDD对象,启动了任务,在StreamInputProcessor类的processInput()方法中 通过一个while(true)中不停的拉取上游的数据,然后调用streamOperator.processElem ......

taskmanager接收到来自jobmanager的jobgraph转换得到的tdd对象,启动了任务,在streaminputprocessor类的processinput()方法中

通过一个while(true)中不停的拉取上游的数据,然后调用streamoperator.processelement(record)调用用户实现的方法去处理数据拉取的数据

首先先来看下这个operator对象

Flink中TaskManager端执行用户逻辑过程(源码分析)

然后看看oneinputstreamoperator类的uml

这里所有的实现类没有全部列出,只列了一些代表

Flink中TaskManager端执行用户逻辑过程(源码分析)

看到这里,写过flink的streamapi的同学,肯定感觉到很熟悉!!!!!!

这里!不就是我们常写flink代码的那些算子嘛

对没有错,我们程序中实现的那些算子逻辑,最后都会被封装成一个oneinputstreamoperator,这里具体看一个最熟悉的fliter

来看一下streamfilter的processelement方法

Flink中TaskManager端执行用户逻辑过程(源码分析)

!!!这里传入一个数据后,这个userfunction调用了filter方法并且把数据放进去了

当返回true通过这个output.collect发送出去了

这不就对应了我们用户自己实现的filter算子嘛,没错这个方法其实就是客户端的filter方法,这个userfunction包含了用户实现filter算子的逻辑

(!!!!!就是说这个processelement方法会调用用户的逻辑)

(所以这个userfunction可以带上client的方法实现,这对我们很重要,特别是对flink源码修改,为clientapi添加新功能方法,运行时可以通过这里拿到)

继续

来看看这个output.collect()方法

Flink中TaskManager端执行用户逻辑过程(源码分析)

然后

 Flink中TaskManager端执行用户逻辑过程(源码分析)

看到这个,等等等等

我不是从这个processelement()方法进来的吗,怎么又开始调processelement()方法了

难道递归了? 不对不对

这里operator不是上一个operator了,而是这个output对象的(这里是chainoutput)

看下这个output对象

Flink中TaskManager端执行用户逻辑过程(源码分析)

看下uml类图,也是只列举了重要的

Flink中TaskManager端执行用户逻辑过程(源码分析)

先看chainingoutput的属性

 Flink中TaskManager端执行用户逻辑过程(源码分析)

发现了又出现了oneinputstreamoperator对象

看到这个实现类的名字!chain联想起了什么

flink会将可以chain在一起的算子在streamgraph转换成jobgraph的时候根据条件chain在一起

一惊!

来分别看一下chainingoutput和recordwriteroutput的collect()方法有什么区别

在chain中

Flink中TaskManager端执行用户逻辑过程(源码分析)

 在recordwriter中

Flink中TaskManager端执行用户逻辑过程(源码分析)

这里chain的ouput,又继续调用了下一个operator的processelement方法,然后又在processelement方法中又调用output.collect( ),collect中又调用了下一个operator的processelement方法

整个过程就是个无限的循环,直到,某一个operator的ouput不为chainingoutput,当变为recordwriteroutput时

上面看到recordwriteroutput的processelement直接emit发送出去了这个数据,再也没有继续调用processelement方法了

这里也就对应了,flink中的责任链,chain在一起的算子会一个接着一个执行,直到无法chain,就会往下游发送emit了

来看一下uml类图帮助理解

Flink中TaskManager端执行用户逻辑过程(源码分析)

 

 里中有我,我中有你,一直相互调用直到无法chain,然后emit往下游发送(这里肯定就有发送端的反压逻辑,以后随缘更新)

那这里的循环调用理解了就会想,那如何确定第一个operator调用,然后进入整个调用链呢

回到taskmanager接收到jobmanager的tdd以后初始化整个任务的时候

streamtask.java中invoke方法中

Flink中TaskManager端执行用户逻辑过程(源码分析)

 先是初始化了一个operatorchain,里面其实就是一个数组streamoperator

Flink中TaskManager端执行用户逻辑过程(源码分析)

在他初始化的时候,其实就是为我们所有的streamoutputs设置了他的output以及会根据jobmanager发送过来的tdd(包含信息)

设置成对应的chainingoutput还是recordwriteroutput,chainoutput会设置他的的operator

然后获取了getheadoperator()其实就是获取了他调用连中的第一个

然后在

Flink中TaskManager端执行用户逻辑过程(源码分析)

 Flink中TaskManager端执行用户逻辑过程(源码分析)

将这个第一个operator关联到了inputprocessor对象里面

后面就简单了在inputprocessor.processinput中就进入了while(true)循环拉取上游数据的逻辑

然后

Flink中TaskManager端执行用户逻辑过程(源码分析)

在这里调用的第一个processelement方法就是我们的那个headoperator

这样整个调用责任链就开始从第一个operator运行起来了