十九、Flink源码阅读--TaskManager启动过程
程序员文章站
2022-07-14 13:51:50
...
本篇我们接这上篇JobManager启动后,来看下TaskManager启动前后的源码。
TM的启动入口类为:org.apache.flink.runtime.taskexecutor.TaskManagerRunner
源码分析
public static void main(String[] args) throws Exception {
... 省略
try {
SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
@Override
public Void call() throws Exception {
runTaskManager(configuration, ResourceID.generate());//启动入口
return null;
}
});
} catch (Throwable t) {
LOG.error("TaskManager initialization failed.", t);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}
这里ResourceID.generate()会生成一个类似uuid的值,我们看下runTaskManager方法
public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
taskManagerRunner.start();
}
===>
public void start() throws Exception {
taskManager.start();
}
===>
public void start() throws Exception {
super.start();
// start by connecting to the ResourceManager
try {
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//和resourceManager连接
} catch (Exception e) {
onFatalError(e);
}
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());//任务监控服务启动
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());//任务执行时的文件缓存实现,由blobService管理
startRegistrationTimeout();//启动注册taskslot,超时等
}
taskSlotTable中包含一个timeService(用于超时分配slot)和一个taskSlots(TaskSlot的数组
TM启动的步骤就这么多。