FATE学习:跟着日志读源码(十一)upload 任务过程中产生的请求
综述
upload 任务的http请求,可以分为两部分。
一部分是submit -> job finish 这个流程中产生的。
另一部分是polling 的过程中,轮询产生的请求(只轮询后收集相关信息,不实际执行)
执行细节
以下的日志,是一个upload执行后,容器中完整的日志。
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/party/202107260820309976351/local/0/create HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/data/upload?%7B%22file%22:%20%22/data/projects/fate/examples/data/breast_hetero_guest.csv%22,%20%22head%22:%201,%20%22partition%22:%201,%20%22work_mode%22:%201,%20%22table_name%22:%20%22hetero_guest%22,%20%22namespace%22:%20%22cl%22,%20%22config%22:%20%22/data/projects/fate/cl/upload_guest.json%22,%20%22function%22:%20%22upload%22%7D HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/resource/apply HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:33] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:34] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:36] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:38] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:40] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:41] "POST /v1/party/202107260820309976351/local/0/update HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:42] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:44] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
static conf path: /data/projects/fate/eggroll/conf/eggroll.properties
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/model HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
其中endpoint为 collect 的是第二部分产生的日志,可以发现除最后一条记录外,时间间隔和轮询时间间隔是一致的。
其余则是第一部分的日志。
提交任务http 请求
按照不同的请求,依次说明其功能和函数调用链
-
data/upload:
提交job
fate_flow_client.py -> fate_flow_server.py -> data_access_app_manager.py -> DAGScheduler.submit -
party/<job_id>//<party_id>/create:
创建job (接上文)
DAGScheduler.submit -> FederatedScheduler.create_job -> fate_flow_server.py -> party_app.py -> JobController.create_job -
party/<job_id>//<party_id>/resource/apply:
为job申请资源
DAGScheduler.schedule_waiting_jobs -> FederatedScheduler.resource_for_job -> fate_flow_server.py -> party_app.py -> ResourceManager.apply_for_job_resource -
party/<job_id>//<party_id>/start:
start job
DAGScheduler.schedule_waiting_jobs -> DAGScheduler.start_job -> FederatedScheduler.start_job(job=job) -> fate_flow_server.py -> party_app.py -> JobController.start_job
如下5-7 依次start task 并更新job状态
5. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/status/running:
更新task状态
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status
-
party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/start:
紧跟上述 FederatedScheduler.sync_task_status 操作之后,start task。
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.start_task -> fate_flow_server.py -> party_app.py -> TaskController.start_task -
party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/report:
反馈状态
TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task
如下8 -11都是是在upload task执行过程中产生的请求
8. party/<job_id>//<party_id>/update:
更新job状态
Upload.save_data_table -> ControllerClient.update_job -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> JobController.update_job
-
tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/output_data_info/save:
保存output_data_info
Upload.save_data_table -> TrackerClient.log_output_data_info -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.insert_output_data_info_into_db -
tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/metric_data/save:
保存指标
Upload.save_data_table -> TrackerClient.log_metric_data -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_data -
tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/metric_meta/save:
保存指标元数据
Upload.save_data_table -> TrackerClient.log_metric_meta -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_meta
回到 task_executor,在finally 中执行 ,更新task状态
12. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/report:
TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task
TaskExecutor.run_task() 执行完毕,执行TaskExecutor.report_task_update_to_driver()
13. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>
report:TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task
Tips:这一次schedule_running_job之后,已经是success 了
14. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/status/success:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> TaskController.update_task_status
Tips:因为已经success 进入下一环节
15. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/stop/success:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.stop_task(接在上一步sync_task_status 之后) -> fate_flow_server.py -> party_app.py -> TaskController.stop_task
-
party/<job_id>//<party_id>/model
DAGScheduler.schedule_running_job -> FederatedScheduler.save_pipelined_model -> fate_flow_server.py -> party_app.py -> JobController.save_pipelined_model -
party/<job_id>//<party_id>/status/success
DAGScheduler.schedule_running_job -> FederatedScheduler.sync_job_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status -
party/<job_id>//<party_id>/stop/success
DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs -
party/<job_id>//<party_id>/clean
DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job
再run 一次 清理干净 参见FATE学习:跟着日志读源码(九)upload任务job finsih阶段
-
party/<job_id>//<party_id>/stop/success:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs
-
party/<job_id>//<party_id>/clean:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job
对于轮询部分
party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/collect:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.collect_task_of_all_party -> FederatedScheduler.collect_task -> party_app.py -> TaskController.collect_task