欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

十九、Flink源码阅读--TaskManager启动过程

程序员文章站 2022-07-14 13:51:50
...

本篇我们接这上篇JobManager启动后,来看下TaskManager启动前后的源码。

TM的启动入口类为:org.apache.flink.runtime.taskexecutor.TaskManagerRunner

源码分析
public static void main(String[] args) throws Exception {
	... 省略
	
	try {
		SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
			@Override
			public Void call() throws Exception {
				runTaskManager(configuration, ResourceID.generate());//启动入口
				return null;
			}
		});
	} catch (Throwable t) {
		LOG.error("TaskManager initialization failed.", t);
		System.exit(STARTUP_FAILURE_RETURN_CODE);
	}
}

这里ResourceID.generate()会生成一个类似uuid的值,我们看下runTaskManager方法

public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
	final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
	taskManagerRunner.start();
}

===>

public void start() throws Exception {
	taskManager.start();
}

===>

public void start() throws Exception {
	super.start();

	// start by connecting to the ResourceManager
	try {
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());//和resourceManager连接
	} catch (Exception e) {
		onFatalError(e);
	}

	// tell the task slot table who's responsible for the task slot actions
	taskSlotTable.start(new SlotActionsImpl());

	// start the job leader service
	jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());//任务监控服务启动

	fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());//任务执行时的文件缓存实现,由blobService管理

	startRegistrationTimeout();//启动注册taskslot,超时等
}

taskSlotTable中包含一个timeService(用于超时分配slot)和一个taskSlots(TaskSlot的数组

TM启动的步骤就这么多。