Apollo源码分析学习笔记
Apollo 源码剖析学习笔记
Apollo 项目介绍
Cyber RT 代码分析
cyber base
[email protected]:~/study/apollo/cyber/base$ tree
.
├── atomic_hash_map.h
├── atomic_hash_map_test.cc
├── atomic_rw_lock.h
├── atomic_rw_lock_test.cc
├── bounded_queue.h
├── bounded_queue_test.cc
├── build
├── BUILD
├── CMakeLists.txt
├── concurrent_object_pool.h
├── for_each.h
├── for_each_test.cc
├── macros.h
├── main_test.cc
├── object_pool.h
├── object_pool_test.cc
├── reentrant_rw_lock.h
├── rw_lock_guard.h
├── signal.h
├── signal_test.cc
├── thread_pool.h
├── thread_safe_queue.h
├── unbounded_queue.h
├── unbounded_queue_test.cc
└── wait_strategy.h
1 directory, 24 files
测试的CMakeLists.txt
cmake_minimum_required(VERSION 3.5)
project(CyberBase)
set(CMAKE_CXX_STANDARD 17)
set(PROJECT_SOURCE_DIR "../../")
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR})
message(${PROJECT_SOURCE_DIR})
add_executable(atomic_hash_map_test main_test.cc atomic_hash_map_test.cc)
target_link_libraries(atomic_hash_map_test -lgtest -lpthread)
#target_link_libraries(signal_test Cyber::signal)
build目录为代码生成测试目录
cyber 入口
cyber 的入口在"cyber/mainboard"目录中:
[email protected]:~/study/apollo/cyber/mainboard$ tree
.
├── BUILD
├── mainboard.cc //主函数
├── module_argument.cc //模块输入参数
├── module_argument.h
├── module_controller.cc //模块加载,卸载
└── module_controller.h
0 directories, 6 files
mainboard 中的文件比较少,也很好理解,我们先从"mainboard.cc"中开始分析:
#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/init.h"
#include "cyber/mainboard/module_argument.h"
#include "cyber/mainboard/module_controller.h"
#include "cyber/state.h"
using apollo::cyber::mainboard::ModuleArgument;
using apollo::cyber::mainboard::ModuleController;
int main(int argc, char **argv)
{
// parse the argument 解析参数
ModuleArgument module_args;
module_args.ParseArgument(argc, argv);
// initialize cyber 初始化 cyber
apollo::cyber::Init(argv[0]);
// start module 加载模块
ModuleController controller(module_args);
if (!controller.Init())
{
controller.Clear();
AERROR << "module start error.";
return -1;
}
// 等待 cyber 关闭
apollo::cyber::WaitForShutdown();
controller.Clear();
AINFO << "exit mainboard.";
return 0;
}
上述是“mainboard.cc”的主函数,下面我们重点介绍下具体的过程。
解析参数
解析参数是在“ModuleArgument”类中实现的,主要是解析加载 DAG 文件时候带的参数。
void ModuleArgument::DisplayUsage()
{
AINFO << "Usage: \n " << binary_name_ << " [OPTION]...\n"
<< "Description: \n"
<< " -h, --help : help information \n"
<< " -d, --dag_conf=CONFIG_FILE : module dag config file\n"
<< " -p, --process_group=process_group: the process "
"namespace for running this module, default in manager process\n"
<< " -s, --sched_name=sched_name: sched policy "
"conf for hole process, sched_name should be conf in cyber.pb.conf\n"
<< "Example:\n"
<< " " << binary_name_ << " -h\n"
<< " " << binary_name_ << " -d dag_conf_file1 -d dag_conf_file2 "
<< "-p process_group -s sched_name\n";
}
void ModuleArgument::ParseArgument(const int argc, char *const argv[])
{
// 二进制模块名称
binary_name_ = std::string(basename(argv[0]));
//解析参数
GetOptions(argc, argv);
// 如果没有 process_group_和 sched_name_,则赋值为虚认值
if (process_group_.empty())
{
process_group_ = DEFAULT_process_group_;
}
//如果有,则设置对应的参数
if (sched_name_.empty())
{
sched_name_ = DEFAULT_sched_name_;
}
GlobalData::Instance()->SetProcessGroup(process_group_);
GlobalData::Instance()->SetSchedName(sched_name_);
AINFO << "binary_name_ is " << binary_name_ << ", process_group_ is "
<< process_group_ << ", has " << dag_conf_list_.size() << " dag conf";
//打印 dag_conf配置,这里的 dag 是否可以设置多个?
for (std::string &dag : dag_conf_list_)
{
AINFO << "dag_conf: " << dag;
}
}
模块加载
在“ModuleController”实现 cyber 模块的加载在“ModuleController:Init0”中调用“LoadAlI() ”来加载所有模块,我们接着看 cyber 是如何加载模块。
首先是找到模块的路径。
if (module_config.module_library().front() == '/')
{
load_path = module_config.module_library();
}
else
{
load_path =
common::GetAbsolutePath(work_root, module_config.module_library());
}
if (!common::PathExists(load_path))
{
AERROR << "Path does not exist: " << load_path;
return false;
}
通过“class_loader_manager_ ”加载模块,后面我们会接着分析“ClassLoaderManager”的具体实现,加载好对应的类之后在创建对应的允象,并且初始化对象.〈调用对象的 Initialize(方法,也就是说所有的 cyber 模块都是通过 Initialize()方法启动的,后面们会接团分析 Initialize 具体干了什么) 。这里的“classloader” 其实类似 java 中的 classloader,即 java 虚拟机在运行时加载对应的类,并且实例化人_” 对象。cyber 中其实也是实现了类型通过动态加载并且实例化类的功能,好处是可以动态加载和关闭单个 cyber模块(定位,感知,规划等),也就是在 dreamview 中的模块开关按钮,实际上就是动态的加载和印载对应的模块。
//通过类加载器加载.1oad_path 下的模class_1oader_manager_.LoadLibrary(load path);
//加载模块
for (auto &component : module_config.components())
{
const std::string &class_name = component.class_name();
//创建对象
std::shared_ptr<ComponentBase> base =
class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
调用对象的 Initialize 方法
if (base == nullptr || !base->Initialize(component.config()))
{
return false;
}
component_list_.emplace_back(std::move(base));
}
//加载定时器模块
for (auto &component : module_config.timer_components())
{
const std::string &class_name = component.class_name();
std::shared_ptr<ComponentBase> base =
class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
if (base == nullptr || !base->Initialize(component.config()))
{
return false;
}
component_list_.emplace_back(std::move(base));
}
上述就是 cyber mainboard 的整个流程,cyber main 函数中先解析 dag 参数滞然后根据解析的参数,通过类加载器动态的加载对应的模块,然后调用 Initialize方法初始化模块。
动态加载对象
类加载器(class_loader)类加载器的作用就是动态的加载动态库然后实例化对象。我们先来解释下, 首先 apollo 中的各个 module都会编译为一个动态库,拿 planning 模块来举例子,在“planning/dag/planning.dag”中,会加载:
module_config {
module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
也就是说,apollo 中的模块都会通过类加载器以动态库的方式加载,然后实例化,之后再调用 Initialize 方法初始化。也就是说,我们讲清楚下面 3 个问题,也就是讲清楚了类加载器的原理。
1、cyber 如何加载 apollo 模块?
2、如何实例化模块?
3、如何初始化模块?
类加载器的实现在“cyber/class_loader”目录中,通过“Poco/SharedLibraryh”库来实现动态库的加载,关于 Poco 动态库的加载可以[参考](Class Poco::SharedLibrary)
[email protected]:~/study/apollo/cyber/class_loader$ tree
.
├── BUILD //编译文件
├── class_loader.cc//类加载器
├── class_loader.h
├── class_loader_manager.cc//类加载器管理
├── class_loader_manager.h
├── class_loader_register_macro.h//类加载器注册宏定义
├── class_loader_test.cc
├── shared_library
│ ├── BUILD
│ ├── exceptions.h
│ ├── sample.cc
│ ├── sample.h
│ ├── shared_library.cc
│ ├── shared_library.h
│ └── shared_library_test.cc
├── test
│ ├── base.h
│ ├── BUILD
│ ├── plugin1.cc
│ └── plugin2.cc
└── utility
├── BUILD
├── class_factory.cc //类工厂
├── class_factory.h
├── class_loader_utility.cc//类加载器工具类
└── class_loader_utility.h
3 directories, 23 files
我们先从“class_loaderh”开始看起,首先我们分析下“class_loader”实现的具体方法:
/** * for library load,createclass object */class ClassLoader { public: explicit ClassLoader(const std::string& library_path); virtual ~ClassLoader(); bool IsLibraryLoaded();//库是否已经加载 bool LoadLibrary();// 贡载库 int UnloadLibrary();// 卸载库 const std::string GetLibraryPath() const;// 获取库路径 template <typename Base> std::vector<std::string> GetValidClassNames();// 获取类名称 template <typename Base> std::shared_ptr<Base> CreateClassObj(const std::string& class_name);// 实例化类对象 template <typename Base> bool IsClassValid(const std::string& class_name);//判断类是否有效 private: template <typename Base> void OnClassObjDeleter(Base* obj); private: std::string library_path_;// 类路径 int loadlib_ref_count_;// 类加载引用次数 std::mutex loadlib_ref_count_mutex_;// 类加载引用次数锁 int classobj_ref_count_;// 类引用次数 std::mutex classobj_ref_count_mutex_;// 类引用次数锁};
可以看到类加载器主要是提供了加载类, 邱载类和实例化类的接口。实际上加载类和印载类的实现都比较简单,都是调用“utility ”类中的实现,我们暂时先放一边,先看下实例化对象的实现。
template <typename Base>std::shared_ptr<Base> ClassLoader::CreateClassObj( const std::string &class_name){ // 加载库 if (!IsLibraryLoaded()) { LoadLibrary(); } // 根据类名称创建对象 Base *class_object = utility::CreateClassObj<Base>(class_name, this); if (class_object == nullptr) { AWARN << "CreateClassObj failed, ensure class has been registered. " << "classname: " << class_name << ",lib: " << GetLibraryPath(); return std::shared_ptr<Base>(); } // 类引用计数加1 std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_); classobj_ref_count_ = classobj_ref_count_ + 1; // 指定类的析构函数 std::shared_ptr<Base> classObjSharePtr( class_object, std::bind(&ClassLoader::OnClassObjDeleter<Base>, this, std::placeholders::_1)); return classObjSharePtr;}
可以看到创建类的时候,类引用计数加13. 并且绑定类的析构函数(OnClassObjDeleteD ,删除对象的时候计娄引用计数减1
template <typename Base>void ClassLoader::OnClassObjDeleter(Base *obj){ if (nullptr == obj) { return; } std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_); delete obj; --classobj_ref_count_;}
我们先简单的分析下 ClassLoaderManager,最后再分析 utility 。
类加载管理器 (ClassLoaderManager)
类加载器管理实际上是管理不同的 classloader ,而不同的libpath 对应不同的 classloader 。ClassLoaderManager 主要的数据结构其实如下:
std::map<std::string, ClassLoader *> libpath_loader_map_;
其中“libpath_loader_map_”为 map 结构,在“LoadLibrary”的时候赋值,key 为 library_path,而 value为 ClassLoader,
bool ClassLoaderManager::LoadLibrary(const std::string &library_path){ std::lock_guard<std::mutex> lck(libpath_loader_map_mutex_); if (!IsLibraryValid(library_path)) { libpath_loader_map_[library_path] = new class_loader::ClassLoader(library_path); } return IsLibraryValid(library_path);}
也就是说“ClassLoaderManager”对 ClassLoader 进行保存和管理。最后我们分析下 utility 具体的实现,utility 分为 2 部分,一部分为 ClassFactory, 一部分为工具函数《class_loader_utility.cc)
ClassFactory
可以看到有如下继承关系“ ClassFaetory -> AbstractClassFactory -> AbstractClassFactoryBase ”,其中“ClassFactory“和”AbstractClassFactory ”为模板类,主要的实现在“AbstractClassFactoryBase”中,我们逐个分析:首先是类初始化,指定了“relative_library path”,“base_class_name”,“class_name_”
AbstractClassFactoryBase::AbstractClassFactoryBase( const std::string &class_name, const std::string &base_class_name) : relative_library_path_(""), base_class_name_(base_class_name), class_name_(class_name) {}
设置 OwnedClassLoader,而“RemoveOwnedClassLoader”同理。
void AbstractClassFactoryBase::AddOwnedClassLoader(ClassLoader *loader){ if (std::find(relative_class_loaders_.begin(), relative_class_loaders_.end(), loader) == relative_class_loaders_.end()) { relative_class_loaders_.emplace_back(loader); }}
classloader 是否属于该 classFactory 。
bool AbstractClassFactoryBase::IsOwnedBy(const ClassLoader *loader){ std::vector<ClassLoader *>::iterator itr = std::find( relative_class_loaders_.begin(), relative_class_loaders_.end(), loader); return itr != relative_class_loaders_.end();}
也是说 ClassFactory 能够生产一个路径下的所有类,一个 ClassFactory 可能有好几个 ClassLoader,分为base_class_name 和 class_name。
接下来我们看“class_loader_utility.cc”的实现,文件中实现了很多函数,这个分析如下;
创建对象(CreateClassObj)的具体实现如下,先找到类对应的 factory,然后通过 factory 创建对象。
template <typename Base>
std::shared_ptr<Base> ClassLoaderManager::CreateClassObj(
const std::string &class_name)
{
std::vector<ClassLoader *> class_loaders = GetAllValidClassLoaders();
for (auto class_loader : class_loaders)
{
if (class_loader->IsClassValid<Base>(class_name))
{
return (class_loader->CreateClassObj<Base>(class_name));
}
}
AERROR << "Invalid class name: " << class_name;
return std::shared_ptr<Base>();
}
注册类到 factory 。
template <typename Derived, typename Base>
void RegisterClass(const std::string &class_name,
const std::string &base_class_name)
{
AINFO << "registerclass:" << class_name << "," << base_class_name << ","
<< GetCurLoadingLibraryName();
utility::AbstractClassFactory<Base> *new_class_factrory_obj =
new utility::ClassFactory<Derived, Base>(class_name, base_class_name);
new_class_factrory_obj->AddOwnedClassLoader(GetCurActiveClassLoader());
new_class_factrory_obj->SetRelativeLibraryPath(GetCurLoadingLibraryName());
GetClassFactoryMapMapMutex().lock();
ClassClassFactoryMap &factory_map =
GetClassFactoryMapByBaseClass(typeid(Base).name());
factory_map[class_name] = new_class_factrory_obj;
GetClassFactoryMapMapMutex().unlock();
}
查找 classloader 中所有类的名称。
template <typename Base>std::vector<std::string> GetValidClassNames(ClassLoader *loader){ std::lock_guard<std::recursive_mutex> lck(GetClassFactoryMapMapMutex()); ClassClassFactoryMap &factoryMap = GetClassFactoryMapByBaseClass(typeid(Base).name()); std::vector<std::string> classes; for (auto &class_factory : factoryMap) { AbstractClassFactoryBase *factory = class_factory.second; if (factory && factory->IsOwnedBy(loader)) { classes.emplace_back(class_factory.first); } } return classes;}
加载类,通过指定的 classloader 加载指定路径下的库
bool LoadLibrary(const std::string &library_path, ClassLoader *loader)
{
// 类是否已经被加载,如果被加载则对应的 class_factory 加上依赖的class_loader
if (IsLibraryLoadedByAnybody(library_path))
{
AINFO << "lib has been loaded by others,only attach to class factory obj."
<< library_path;
ClassFactoryVector lib_class_factory_objs =
GetAllClassFactoryObjectsOfLibrary(library_path);
for (auto &class_factory_obj : lib_class_factory_objs)
{
class_factory_obj->AddOwnedClassLoader(loader);
}
return true;
}
SharedLibraryPtr shared_library = nullptr;
static std::recursive_mutex loader_mutex;
{
std::lock_guard<std::recursive_mutex> lck(loader_mutex);
try
{
//设置当前**的classloader,当前加载库路径
SetCurActiveClassLoader(loader);
SetCurLoadingLibraryName(library_path);
shared_library = SharedLibraryPtr(new SharedLibrary(library_path));
}
catch (const LibraryLoadException &e)
{
SetCurLoadingLibraryName("");
SetCurActiveClassLoader(nullptr);
AERROR << "LibraryLoadException: " << e.what();
}
catch (const LibraryAlreadyLoadedException &e)
{
SetCurLoadingLibraryName("");
SetCurActiveClassLoader(nullptr);
AERROR << "LibraryAlreadyLoadedException: " << e.what();
}
catch (const SymbolNotFoundException &e)
{
SetCurLoadingLibraryName("");
SetCurActiveClassLoader(nullptr);
AERROR << "SymbolNotFoundException: " << e.what();
}
SetCurLoadingLibraryName("");
SetCurActiveClassLoader(nullptr);
}
if (shared_library == nullptr)
{
AERROR << "shared library failed: " << library_path;
return false;
}
auto num_lib_objs = GetAllClassFactoryObjectsOfLibrary(library_path).size();
if (num_lib_objs == 0)
{
AWARN << "Class factory objs counts is 0, maybe registerclass failed.";
}
std::lock_guard<std::recursive_mutex> lck(GetLibPathSharedLibMutex());
LibPathSharedLibVector &opened_libraries = GetLibPathSharedLibVector();
// 保存加载路径和对应的 poco_library
opened_libraries.emplace_back(
std::pair<std::string, SharedLibraryPtr>(library_path, shared_library));
return true;
}
上面我们分析了 classloader 动态的加载并月创建类对象, 而在 mainhoard 中通过动态的加载 module, 并且调用模块的 Initialize 方法,实现模块的初始化。下面我们看下模块的初始化过程。
模块初始化
component 概述(cyber 组件)
我们先看下 component 的目录结构。可以看到 cyber 组件分为 2 类:普通组件和定时组件,而二者都继承至基础组件;
[email protected]:~/study/apollo/cyber/component$ tree.├── BUILD├── component_base.h //基础组件├── component.h //组件├── component_test.cc├── timer_component.cc //定时组件├── timer_component.h└── timer_component_test.cc0 directories, 7 files
基础组件 component_base
我们先看下基础组件中实现了什么,也就是“component_base.h”中实现了什么。“component_base.h”实现了“ComponentBase”类,下面我们逐步分析“ComponentBase”类的 public 方法。”1、Initialize 方法Initialize 方法在派生类中重写了,这里有 2 个 Initialize 方法,分别对应上述所说的 2 种类型的组件。
virtual bool Initialize(const ComponentConfig& config) { return false; } virtual bool Initialize(const TimerComponentConfig& config) { return false; }
2、Shutdown 方法
用于关闭 cyber 模块
virtual void Shutdown(){ if (is_shutdown_.exchange(true)) { return; } Clear(); for (auto &reader : readers_) { reader->Shutdown(); } scheduler::Instance()->RemoveTask(node_->Name());}
3、GetProtoConfig 方法
获取 protobuf 格式的配置。
template <typename T>bool GetProtoConfig(T *config) const{ return common::GetProtoFromFile(config_file_path_, config);}
看完公有方法,下面我们看下私有方法.有些简单的方法这里就不详细说了,主要看下“LoadConfigFiies方法,有2 个“LoadConfigFiles” 方法这里只介绍第一个:
void LoadConfigFiles(const ComponentConfig &config){ if (!config.config_file_path().empty()) { if (config.config_file_path()[0] != '/') { config_file_path_ = common::GetAbsolutePath(common::WorkRoot(), config.config_file_path()); } else { config_file_path_ = config.config_file_path(); } } if (!config.flag_file_path().empty()) { std::string flag_file_path = config.flag_file_path(); if (flag_file_path[0] != '/') { flag_file_path = common::GetAbsolutePath(common::WorkRoot(), flag_file_path); } google::SetCommandLineOption("flagfile", flag_file_path.c_str()); }}
5、私有成员变量
最后我们在分析下私有成员变量,也就是说每个组件(componenb)会自动创建一个节点(node),并且可以挂载多个 reader。
std::atomic<bool> is_shutdown_ = {false}; std::shared_ptr<Node> node_ = nullptr; std::string config_file_path_ = ""; std::vector<std::shared_ptr<ReaderBase>> readers_;
下面我们开始分析 component 组件,也就是 Component 类。
Component 组件
Component 类都需要实现“Initialize”和“Process“2 个方法,所以 planning.fouting,perception 等模块都需要实现这 2? 个方法。
template <typename M0>
class Component<M0, NullType, NullType, NullType> : public ComponentBase
{
public:
Component() {}
~Component() override {}
bool Initialize(const ComponentConfig &config) override;
bool Process(const std::shared_ptr<M0> &msg);
private:
virtual bool Proc(const std::shared_ptr<M0> &msg) = 0;
};
我们接着看下这 2 个方法是如何实现的,先看“Process”方法。
1、Process 方法
可以看到 Process 方法比较简单,先判断模块是否关闭,然后执行“Proc”方法。
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Process(
const std::shared_ptr<M0> &msg0, const std::shared_ptr<M1> &msg1)
{
if (is_shutdown_.load())
{
return true;
}
return Proc(msg0, msg1);
}
2、Initialize 方法
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
const ComponentConfig &config)
{
// 创建node 节点
node_.reset(new Node(config.name()));
//加载配置
LoadConfigFiles(config);
// 订阅消息数和 reader 个数要匹配
if (config.readers_size() < 2)
{
AERROR << "Invalid config file: too few readers.";
return false;
}
//初始化,在基类(ComponentBase)中实现
if (!Init())
{
AERROR << "Component Init() failed.";
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
//创建 readerl
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
auto reader1 = node_->template CreateReader<M1>(reader_cfg);
//创建 reader0
reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
std::shared_ptr<Reader<M0>> reader0 = nullptr;
// is _reality_mode 模式则直接创建
if (cyber_likely(is_reality_mode))
{
reader0 = node_->template CreateReader<M0>(reader_cfg);
}
else
{
// 如果不是则创建回调函数
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
{
auto ptr = self.lock();
if (ptr)
{
if (!blocker1->IsPublishedEmpty())
{
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
}
else
{
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(reader_cfg, func);
}
if (reader0 == nullptr || reader1 == nullptr)
{
AERROR << "Component create reader failed.";
return false;
}
// 保存 readers
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));
if (cyber_unlikely(!is_reality_mode))
{
return true;
}
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto func = [self](const std::shared_ptr<M0> &msg0,
const std::shared_ptr<M1> &msg1)
{
auto ptr = self.lock();
if (ptr)
{
ptr->Process(msg0, msg1);
}
else
{
AERROR << "Component object has been destroyed.";
}
};
std::vector<data::VisitorConfig> config_list;
for (auto &reader : readers_)
{
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
}
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
// 创建执程类
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}
3、component 动态加载
Cyber 主 函数在 “ModuleController:Imit() ”进行模块的加载,具体的加载过程在“ModuleController::LoadModule”中。
bool ModuleController::LoadModule(const DagConfig &dag_config)
{
const std::string work_root = common::WorkRoot();
for (auto module_config : dag_config.module_config())
{
// 1. 加载动态库
std::string load_path;
if (module_config.module_library().front() == '/')
{
load_path = module_config.module_library();
}
else
{
load_path =
common::GetAbsolutePath(work_root, module_config.module_library());
}
if (!common::PathExists(load_path))
{
AERROR << "Path does not exist: " << load_path;
return false;
}
class_loader_manager_.LoadLibrary(load_path);
for (auto &component : module_config.components())
{
const std::string &class_name = component.class_name();
std::shared_ptr<ComponentBase> base =
class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
if (base == nullptr || !base->Initialize(component.config()))
{
return false;
}
component_list_.emplace_back(std::move(base));
}
for (auto &component : module_config.timer_components())
{
const std::string &class_name = component.class_name();
std::shared_ptr<ComponentBase> base =
class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
if (base == nullptr || !base->Initialize(component.config()))
{
return false;
}
component_list_.emplace_back(std::move(base));
}
}
return true;
}
模块首先通过 classloader 加载到内存,然后创建对象,并且调用模块的初始化方法。component 中每个模块都设计为可以动态加载和卸载, 可以实时在线的开启和关闭模块, 实现的方式是通过 classloader 来进行动态的加载动态库。
4、component 初始化
component 一共有 4 个模板类,分别对应接收 0-3 个消息。我们这里主要分析 2 个消息的情况, 其它的可以类推。
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
const ComponentConfig &config)
{
node_.reset(new Node(config.name()));
LoadConfigFiles(config);
if (config.readers_size() < 2)
{
AERROR << "Invalid config file: too few readers.";
return false;
}
if (!Init())
{
AERROR << "Component Init() failed.";
return false;
}
bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
auto reader1 = node_->template CreateReader<M1>(reader_cfg);
reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
std::shared_ptr<Reader<M0>> reader0 = nullptr;
if (cyber_likely(is_reality_mode))
{
reader0 = node_->template CreateReader<M0>(reader_cfg);
}
else
{
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
config.readers(1).channel());
auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
{
auto ptr = self.lock();
if (ptr)
{
if (!blocker1->IsPublishedEmpty())
{
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
}
else
{
AERROR << "Component object has been destroyed.";
}
};
reader0 = node_->template CreateReader<M0>(reader_cfg, func);
}
if (reader0 == nullptr || reader1 == nullptr)
{
AERROR << "Component create reader failed.";
return false;
}
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));
if (cyber_unlikely(!is_reality_mode))
{
return true;
}
auto sched = scheduler::Instance();
std::weak_ptr<Component<M0, M1>> self =
std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
auto func = [self](const std::shared_ptr<M0> &msg0,
const std::shared_ptr<M1> &msg1)
{
auto ptr = self.lock();
if (ptr)
{
ptr->Process(msg0, msg1);
}
else
{
AERROR << "Component object has been destroyed.";
}
};
std::vector<data::VisitorConfig> config_list;
for (auto &reader : readers_)
{
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
}
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}
总结 component 流程
对以上 componet 的流程总结如下:
1、创建node 节点〈1 个 component 只能有 1 个 node 节点,之后用户可以用 node 在 init 中自己创建 reader 或writer) 。
2、调用用户自定义的初始化函数 Init0〈子类的 Init方法
3、创建 reader,订阅几个消息就创建几个 reader。
4、创建回调函数,实际上是执行用户定义算法 Proc()函数
5、创建数据访问器,数据访问器的用途为接收数据融合多个通道的数据) ,唤醒对应的协程执行任务。
6、创建协程任务绑定回调函数,并且绑定数据访问器到对应的协程任务,用于唤醒对应的任务。对 cyber 数据的收发流程有了一个简单的介绍,接下去我们会分别介绍如何创建协程、如何在 scheduler
注册任务并且绑定 Notify。也就是说,为了方便理解,你可以认为数据通过 DataDispatcher 已经分发到了对应的 DataVisitor 中,接下来我们只分析如何从 DataVisitor 中取数据,并且触发对应的协程执行回调任务。
创建协程
创建协程对应上述代码。
croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0>(func, dv);
接下来我们查看下如何创建协程: 协程通过工厂模式方法创建,里面包含一个回调函数和一个 dy 〈数据访问器) 。
template <typename M0, typename F>RoutineFactory CreateRoutineFactory( F &&f, const std::shared_ptr<data::DataVisitor<M0>> &dv){ RoutineFactory factory; factory.SetDataVisitor(dv); factory.create_routine = [ = ]() { return [ = ]() { std::shared_ptr<M0> msg; for (;;) { CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT); if (dv->TryFetch(msg)) { f(msg); CRoutine::Yield(RoutineState::READY); } else { CRoutine::Yield(); } } }; }; return factory;}
上述过程总结如下:
1、工厂中设置 DataVisitor
2、工厂中创建设置协程执行函数,回调包括 3 个步骤 从 DataVisitor 中获取数据,执行回调函数,继续休限。
创建调度任务
创建调度任务是在过程“Component:Initialize”中完成。
sched->CreateTask(factory, node_->Name());
我们接着分析如何在 Scheduler 中创建任务。
bool Scheduler::CreateTask(std::function<void()> &&func, const std::string &name, std::shared_ptr<DataVisitorBase> visitor){ if (cyber_unlikely(stop_.load())) { ADEBUG << "scheduler is stoped, cannot create task!"; return false; } // 1. 根据名称创建任务 ID auto task_id = GlobalData::RegisterTaskName(name); auto cr = std::make_shared<CRoutine>(func); cr->set_id(task_id); cr->set_name(name); AINFO << "create croutine: " << name; //2. 分发协程任务 if (!DispatchTask(cr)) { return false; } //3. 注册 Notify 隐醒任务 if (visitor != nullptr) { visitor->RegisterNotifyCallback([this, task_id]() { if (cyber_unlikely(stop_.load())) { return; } this->NotifyProcessor(task_id); }); } return true;}
TimerComponent
实际上 Component 分为 2 类: 一类是上面介绍的消息驱动的 Component,第二类是定时调用的TimerComponent。定时调度模块没有绑定消息收发,需要用户自己创建 reader 来读取消息, 如果需要读取多个消息,可以创建多个 reader。
bool TimerComponent::Initialize(const TimerComponentConfig &config){ if (!config.has_name() || !config.has_interval()) { AERROR << "Missing required field in config file."; return false; } //1. 创建node node_.reset(new Node(config.name())); LoadConfigFiles(config); //2. 调用用户自定义初始化函数 if (!Init()) { return false; } std::shared_ptr<TimerComponent> self = std::dynamic_pointer_cast<TimerComponent>(shared_from_this()); // 3. 创建定时器,定时调用"Proc() "函数 auto func = [self]() { self->Proc(); }; timer_.reset(new Timer(config.interval(), func, false)); timer_->Start(); return true;}
总结一下 TimerComponent 的执行流程如下。
1、创建Node
2、调用用户自定义初始化函数
3、创建定时器,定时调用"Proc()"函数
上述就是 Component 模块的调用流程。为了于清楚消息的调用过程,下面我们分析“DataDispatcher”和
“DataVisitor”
DataVisitor 和DataDispatcher
DataDispather (消息分发器) 发布消息,DataDispather 是一个单例,-所有的数据分发都在数据分发器中进行, DataDispather 会把数据放到对应的缓存中六然后 Notify(通知)对应的协程(实际上这里调用的是 DataVisitor中注册的 Notify ) 去处理消息。 DataVisitor (消息访问器) 是–个辅助的类,一个数据处理过程对应一个DataVisitor,通过在 DataVisitor 中注册 Notify, (唤醒对应的协程,协程执行绑定的回调函数) ,并且注册对应的 Buffer 到 DataDispather, 这样在:DataDispather 的时候会通知对应的 DataVisitor 去唤醒对应的协程。 也就是说 DataDispather (消息分发器) 发布对应的消息到 Datavisitor,DataVisitor (消息访问器) 唤醒对应的协程,协程中执行绑定的数据处理回调函数。
DataVisitor 数据访问器
DataVisitor 继承至 DataVisitorBase 类,先看 DataVisitorBase 类的实现。
class DataVisitorBase{public: //1. 初始化的时候创建一个 Notifier DataVisitorBase() : notifier_(new Notifier()) {} //2. 设置注册回调 void RegisterNotifyCallback(std::function<void()> &&callback) { notifier_->callback = callback; }protected: DataVisitorBase(const DataVisitorBase &) = delete; DataVisitorBase &operator=(const DataVisitorBase &) = delete; // 3. 下一次消息的下标 uint64_t next_msg_index_ = 0; // 4. DataNotifier 单例 DataNotifier *data_notifier_ = DataNotifier::Instance(); std::shared_ptr<Notifier> notifier_;};
可以看到 DataVisitorBase 创建了一个“Notifier”类,并且提供注册回调的接口。同时还引用了“DataNotifier::Instance()”单例。接下来看“DataVisitor”类的实现。
template <typename M0, typename M1 = NullType, typename M2 = NullType, typename M3 = NullType>class DataVisitor : public DataVisitorBase{public: explicit DataVisitor(const std::vector<VisitorConfig> &configs) : buffer_m0_(configs[0].channel_id, new BufferType<M0>(configs[0].queue_size)), buffer_m1_(configs[1].channel_id, new BufferType<M1>(configs[1].queue_size)), buffer_m2_(configs[2].channel_id, new BufferType<M2>(configs[2].queue_size)), buffer_m3_(configs[3].channel_id, new BufferType<M3>(configs[3].queue_size)) { //在DataDispatcher 中增加 ChannelBuffer DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_); DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_); DataDispatcher<M2>::Instance()->AddBuffer(buffer_m2_); DataDispatcher<M3>::Instance()->AddBuffer(buffer_m3_); //2. 在DataNotifier::Instance()中增加创建好的 Notifier data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_); //3. 对接收到的消息进行数据融合 data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>( buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_); } ~DataVisitor() { if (data_fusion_) { delete data_fusion_; data_fusion_ = nullptr; } } bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1, // NOLINT std::shared_ptr<M2> &m2, std::shared_ptr<M3> &m3) // NOLINT { //4: 获取融合数据 if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3)) { next_msg_index_++; return true; } return false; }private: fusion::DataFusion<M0, M1, M2, M3> *data_fusion_ = nullptr; ChannelBuffer<M0> buffer_m0_; ChannelBuffer<M1> buffer_m1_; ChannelBuffer<M2> buffer_m2_; ChannelBuffer<M3> buffer_m3_;};
总结一下 DataVisitor 中实现的功能。
1、在 DataDispatcher 中添加订阅的 ChannelBuffer
2、在 DataNotifier 中增加对应通道的 Notifier
3、通过 DataVisitor 获取数据并进行融合
这里注意:
- 、如果 DataVisitor 只访问一个消息,则不会对消息进行融合,如果 DataVisitor 访问 2 个以上的数据,
那么需要进行融合,并且注册融合回调。之后 CacheBuffer 中会调用融合回调进行数据处理,而不会把数据放
入 CacheBuffer中。
// 1. 只有一个消息的时候直接从 Buffer 中获取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1, // NOLINT std::shared_ptr<M2> &m2) // NOLINT{ if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2)) { next_msg_index_++; return true; } return false;}// 2. 当有 2 个消息的时候,从融合 buffer 中读取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1) // NOLINT{ if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) { next_msg_index_++; return true; } return false;}
- 实际上如果有多个消息的时候,会以第 1 个消息为基准,然后把其它消息的最新消息一起放入融合好的 buffer_fusion_ 。
AllLatest(const ChannelBuffer<M0> &buffer_0, const ChannelBuffer<M1> &buffer_1, const ChannelBuffer<M2> &buffer_2, const ChannelBuffer<M3> &buffer_3) : buffer_m0_(buffer_0), buffer_m1_(buffer_1), buffer_m2_(buffer_2), buffer_m3_(buffer_3), buffer_fusion_(buffer_m0_.channel_id(), new CacheBuffer<std::shared_ptr<FusionDataType>>( buffer_0.Buffer()->Capacity() - uint64_t(1))){ buffer_m0_.Buffer()->SetFusionCallback( [this](const std::shared_ptr<M0> &m0) { std::shared_ptr<M1> m1; std::shared_ptr<M2> m2; std::shared_ptr<M3> m3; if (!buffer_m1_.Latest(m1) || !buffer_m2_.Latest(m2) || !buffer_m3_.Latest(m3)) { return; } auto data = std::make_shared<FusionDataType>(m0, m1, m2, m3); std::lock_guard<std::mutex> lg(buffer_fusion_.Buffer()->Mutex()); buffer_fusion_.Buffer()->Fill(data); });}
- DataFusion 类是一个虚类,定义了数据融合的接口“Fusion0”; Apollo 里只提供了一种数据融合的方式,即以第一个消息的时间为基准,取其它最新的消息,当然也可以在这里实现共它的数据融台方式。
DataDispatcher 数据分发器
接下来我们看 DataDispatcher 的实现。
template <typename T>class DataDispatcher{public: using BufferVector = std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>; ~DataDispatcher() {} //1. 添加 ChannelBuffer 到buffers_map void AddBuffer(const ChannelBuffer<T> &channel_buffer); // 2. 分发通道中的消息 bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T> &msg);private: //3. DataNotifier 单例 DataNotifier *notifier_ = DataNotifier::Instance(); std::mutex buffers_map_mutex_; // 4 险希表,key 为通道 id,value 为订阅通道消息的 CacheBuffer 数组。 AtomicHashMap<uint64_t, BufferVector> buffers_map_; // 5. 单例 DECLARE_SINGLETON(DataDispatcher)};
总结一下 DataDispatcher 的实现。
1、添加 ChannelBuffer 到 buffers_map_,key 为通道 id (topic) ,value 为订阅通道消息的 CacheBuffer 数组。
2、分发通道中的消息。根据通道id,把消息放入对应的 CacheBuffer。然后通过 DataNotifier:Instance0通知对应的通道。如果一个通道(topic)有 3 个 CacheBuffer订阅,那么每次都会往这 3 个 CacheBuffer 中写入当前消息的指针。因为消息是共享的,消息访问的时候需要加锁。那么 DataNotifier 如何通知对应的 Channel 的呢? 理解清楚了 DataNotifier 的数据结构,那么也就理解了DataNotifier 的原理。
class DataNotifier{public: using NotifyVector = std::vector<std::shared_ptr<Notifier>>; ~DataNotifier() {} void AddNotifier(uint64_t channel_id, const std::shared_ptr<Notifier> ¬ifier); bool Notify(const uint64_t channel_id);private: std::mutex notifies_map_mutex_; //1. 险希表,key 为通道 id,value 为 Notify 数组 AtomicHashMap<uint64_t, NotifyVector> notifies_map_; DECLARE_SINGLETON(DataNotifier)};
DataNotifier 中包含一个哈希表,, 表的 key 为通道 it,表的值为 Notify 数组,每个 DataVisitorBase 在初始化的时候会创建一个 Notify 。接着我们看 下CacheBuffer 的实现,CacheBuffer 实际上实现了一个缓存队列,主要关注下 Fill 函数。
void Fill(const T &value){ //1., 融合回调 if (fusion_callback_) { fusion_callback_(value); } else { //2. 如果 Buffer 满,实现循环队列 if (Full()) { buffer_[GetIndex(head_)] = value; ++head_; ++tail_; } else { buffer_[GetIndex(tail_ + 1)] = value; ++tail_; } }}
ChannelBuffer 是 CacheBuffer 的封装,主要看下获取值。
template <typename T>bool ChannelBuffer<T>::Fetch(uint64_t *index, std::shared_ptr<T> &m) // NOLINT{ std::lock_guard<std::mutex> lock(buffer_->Mutex()); if (buffer_->Empty()) { return false; } if (*index == 0) { *index = buffer_->Tail(); } else if (*index == buffer_->Tail() + 1) { return false; } else if (*index < buffer_->Head()) { auto interval = buffer_->Tail() - *index; AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] " << "read buffer overflow, drop_message[" << interval << "] pre_index[" << *index << "] current_index[" << buffer_->Tail() << "] "; *index = buffer_->Tail(); } m = buffer_->at(*index); return true;}
data 目录总结
通过上述的分析,实际上数据的访问都是通过“DataVisitor”来实现,数据的分发通过“DataDispatcher”来实现。reader 中也是通过 DataVisitor 来访问数据,在 reader 中订阅对应的 DataDispatcher。也就是说如果你要订阅一个通道,首先是在 reader 中注册消息的 topic,绑定 DataDispatcher,之后对应通道的消息到来之后,触发 DataDispatcher 分发消息,而 DataDispatcher 通过 DataVisitor 中的 Notify 唤醒协程,从 DataVisitor 中获取消息,并执行协程中绑定的回调函数,以上就是整个消息的收发过程。疑问:Reader 中还拷贝了一份数据到 Blocker 中,实际上数据的处理过程并不需要缓存数据,参考“Planning”模块中的实现都是在回调函数中把数据拷贝到指针中。看注释是说 Blocker 是用来仿真的? 后面需要确实下。以下是 Planning 模块中回调函数中拷贝数据的实现。
上一篇: werkzeug源码阅读笔记(二) 下
下一篇: postman测试脚本