欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Apollo源码分析学习笔记

程序员文章站 2022-07-12 11:47:10
...

Apollo 源码剖析学习笔记

Apollo 项目介绍

Cyber RT 代码分析

cyber base

[email protected]:~/study/apollo/cyber/base$ tree
.
├── atomic_hash_map.h
├── atomic_hash_map_test.cc
├── atomic_rw_lock.h
├── atomic_rw_lock_test.cc
├── bounded_queue.h
├── bounded_queue_test.cc
├── build
├── BUILD
├── CMakeLists.txt
├── concurrent_object_pool.h
├── for_each.h
├── for_each_test.cc
├── macros.h
├── main_test.cc
├── object_pool.h
├── object_pool_test.cc
├── reentrant_rw_lock.h
├── rw_lock_guard.h
├── signal.h
├── signal_test.cc
├── thread_pool.h
├── thread_safe_queue.h
├── unbounded_queue.h
├── unbounded_queue_test.cc
└── wait_strategy.h

1 directory, 24 files

测试的CMakeLists.txt

cmake_minimum_required(VERSION 3.5)
project(CyberBase)

set(CMAKE_CXX_STANDARD 17) 
set(PROJECT_SOURCE_DIR "../../")

INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR})

message(${PROJECT_SOURCE_DIR})

add_executable(atomic_hash_map_test main_test.cc atomic_hash_map_test.cc)
target_link_libraries(atomic_hash_map_test -lgtest -lpthread)
#target_link_libraries(signal_test Cyber::signal)

build目录为代码生成测试目录

cyber 入口

cyber 的入口在"cyber/mainboard"目录中:

[email protected]:~/study/apollo/cyber/mainboard$ tree
.
├── BUILD
├── mainboard.cc //主函数
├── module_argument.cc //模块输入参数
├── module_argument.h
├── module_controller.cc //模块加载,卸载
└── module_controller.h

0 directories, 6 files

mainboard 中的文件比较少,也很好理解,我们先从"mainboard.cc"中开始分析:

#include "cyber/common/global_data.h"
#include "cyber/common/log.h"
#include "cyber/init.h"
#include "cyber/mainboard/module_argument.h"
#include "cyber/mainboard/module_controller.h"
#include "cyber/state.h"

using apollo::cyber::mainboard::ModuleArgument;
using apollo::cyber::mainboard::ModuleController;

int main(int argc, char **argv)
{
    // parse the argument 解析参数
    ModuleArgument module_args;
    module_args.ParseArgument(argc, argv);

    // initialize cyber 初始化 cyber
    apollo::cyber::Init(argv[0]);

    // start module 加载模块
    ModuleController controller(module_args);
    if (!controller.Init())
    {
        controller.Clear();
        AERROR << "module start error.";
        return -1;
    }
	// 等待 cyber 关闭
    apollo::cyber::WaitForShutdown();
    controller.Clear();
    AINFO << "exit mainboard.";

    return 0;
}

上述是“mainboard.cc”的主函数,下面我们重点介绍下具体的过程。

解析参数

解析参数是在“ModuleArgument”类中实现的,主要是解析加载 DAG 文件时候带的参数。

void ModuleArgument::DisplayUsage()
{
    AINFO << "Usage: \n    " << binary_name_ << " [OPTION]...\n"
          << "Description: \n"
          << "    -h, --help : help information \n"
          << "    -d, --dag_conf=CONFIG_FILE : module dag config file\n"
          << "    -p, --process_group=process_group: the process "
          "namespace for running this module, default in manager process\n"
          << "    -s, --sched_name=sched_name: sched policy "
          "conf for hole process, sched_name should be conf in cyber.pb.conf\n"
          << "Example:\n"
          << "    " << binary_name_ << " -h\n"
          << "    " << binary_name_ << " -d dag_conf_file1 -d dag_conf_file2 "
          << "-p process_group -s sched_name\n";
}

void ModuleArgument::ParseArgument(const int argc, char *const argv[])
{
    // 二进制模块名称
    binary_name_ = std::string(basename(argv[0]));
    //解析参数   
    GetOptions(argc, argv);
	// 如果没有 process_group_和 sched_name_,则赋值为虚认值
    if (process_group_.empty())
    {
        process_group_ = DEFAULT_process_group_;
    }
	//如果有,则设置对应的参数
    if (sched_name_.empty())
    {
        sched_name_ = DEFAULT_sched_name_;
    }

    GlobalData::Instance()->SetProcessGroup(process_group_);
    GlobalData::Instance()->SetSchedName(sched_name_);
    AINFO << "binary_name_ is " << binary_name_ << ", process_group_ is "
          << process_group_ << ", has " << dag_conf_list_.size() << " dag conf";
	//打印 dag_conf配置,这里的 dag 是否可以设置多个?
    for (std::string &dag : dag_conf_list_)
    {
        AINFO << "dag_conf: " << dag;
    }
}

模块加载

在“ModuleController”实现 cyber 模块的加载在“ModuleController:Init0”中调用“LoadAlI() ”来加载所有模块,我们接着看 cyber 是如何加载模块。

首先是找到模块的路径。

if (module_config.module_library().front() == '/')
{
    load_path = module_config.module_library();
}
else
{
    load_path =
        common::GetAbsolutePath(work_root, module_config.module_library());
}

if (!common::PathExists(load_path))
{
    AERROR << "Path does not exist: " << load_path;
    return false;
}

通过“class_loader_manager_ ”加载模块,后面我们会接着分析“ClassLoaderManager”的具体实现,加载好对应的类之后在创建对应的允象,并且初始化对象.〈调用对象的 Initialize(方法,也就是说所有的 cyber 模块都是通过 Initialize()方法启动的,后面们会接团分析 Initialize 具体干了什么) 。这里的“classloader” 其实类似 java 中的 classloader,即 java 虚拟机在运行时加载对应的类,并且实例化人_” 对象。cyber 中其实也是实现了类型通过动态加载并且实例化类的功能,好处是可以动态加载和关闭单个 cyber模块(定位,感知,规划等),也就是在 dreamview 中的模块开关按钮,实际上就是动态的加载和印载对应的模块。

//通过类加载器加载.1oad_path 下的模class_1oader_manager_.LoadLibrary(load path);
//加载模块
for (auto &component : module_config.components())
{
    const std::string &class_name = component.class_name();
    //创建对象
    std::shared_ptr<ComponentBase> base =
        class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
     调用对象的 Initialize 方法
    if (base == nullptr || !base->Initialize(component.config()))
    {
        return false;
    }
    component_list_.emplace_back(std::move(base));
}
//加载定时器模块
for (auto &component : module_config.timer_components())
{
    const std::string &class_name = component.class_name();
    std::shared_ptr<ComponentBase> base =
        class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
    if (base == nullptr || !base->Initialize(component.config()))
    {
        return false;
    }
    component_list_.emplace_back(std::move(base));
}

上述就是 cyber mainboard 的整个流程,cyber main 函数中先解析 dag 参数滞然后根据解析的参数,通过类加载器动态的加载对应的模块,然后调用 Initialize方法初始化模块。

动态加载对象

类加载器(class_loader)类加载器的作用就是动态的加载动态库然后实例化对象。我们先来解释下, 首先 apollo 中的各个 module都会编译为一个动态库,拿 planning 模块来举例子,在“planning/dag/planning.dag”中,会加载:

module_config {
  module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"

也就是说,apollo 中的模块都会通过类加载器以动态库的方式加载,然后实例化,之后再调用 Initialize 方法初始化。也就是说,我们讲清楚下面 3 个问题,也就是讲清楚了类加载器的原理。

1、cyber 如何加载 apollo 模块?

2、如何实例化模块?

3、如何初始化模块?

类加载器的实现在“cyber/class_loader”目录中,通过“Poco/SharedLibraryh”库来实现动态库的加载,关于 Poco 动态库的加载可以[参考](Class Poco::SharedLibrary)

[email protected]:~/study/apollo/cyber/class_loader$ tree
.
├── BUILD //编译文件
├── class_loader.cc//类加载器
├── class_loader.h
├── class_loader_manager.cc//类加载器管理
├── class_loader_manager.h
├── class_loader_register_macro.h//类加载器注册宏定义
├── class_loader_test.cc
├── shared_library
│   ├── BUILD
│   ├── exceptions.h
│   ├── sample.cc
│   ├── sample.h
│   ├── shared_library.cc
│   ├── shared_library.h
│   └── shared_library_test.cc
├── test
│   ├── base.h
│   ├── BUILD
│   ├── plugin1.cc
│   └── plugin2.cc
└── utility
    ├── BUILD
    ├── class_factory.cc //类工厂
    ├── class_factory.h
    ├── class_loader_utility.cc//类加载器工具类
    └── class_loader_utility.h

3 directories, 23 files

我们先从“class_loaderh”开始看起,首先我们分析下“class_loader”实现的具体方法:

/** *  for library load,createclass object */class ClassLoader { public:  explicit ClassLoader(const std::string& library_path);  virtual ~ClassLoader();	  bool IsLibraryLoaded();//库是否已经加载  bool LoadLibrary();// 贡载库  int UnloadLibrary();// 卸载库  const std::string GetLibraryPath() const;// 获取库路径  template <typename Base>  std::vector<std::string> GetValidClassNames();// 获取类名称  template <typename Base>  std::shared_ptr<Base> CreateClassObj(const std::string& class_name);// 实例化类对象  template <typename Base>  bool IsClassValid(const std::string& class_name);//判断类是否有效 private:  template <typename Base>  void OnClassObjDeleter(Base* obj); private:     std::string library_path_;// 类路径  int loadlib_ref_count_;// 类加载引用次数  std::mutex loadlib_ref_count_mutex_;// 类加载引用次数锁  int classobj_ref_count_;// 类引用次数  std::mutex classobj_ref_count_mutex_;// 类引用次数锁};

可以看到类加载器主要是提供了加载类, 邱载类和实例化类的接口。实际上加载类和印载类的实现都比较简单,都是调用“utility ”类中的实现,我们暂时先放一边,先看下实例化对象的实现。

template <typename Base>std::shared_ptr<Base> ClassLoader::CreateClassObj(    const std::string &class_name){    // 加载库    if (!IsLibraryLoaded())    {        LoadLibrary();    }	// 根据类名称创建对象    Base *class_object = utility::CreateClassObj<Base>(class_name, this);    if (class_object == nullptr)    {        AWARN << "CreateClassObj failed, ensure class has been registered. "              << "classname: " << class_name << ",lib: " << GetLibraryPath();        return std::shared_ptr<Base>();    }	// 类引用计数加1    std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_);    classobj_ref_count_ = classobj_ref_count_ + 1;    // 指定类的析构函数    std::shared_ptr<Base> classObjSharePtr(        class_object, std::bind(&ClassLoader::OnClassObjDeleter<Base>, this,                                std::placeholders::_1));    return classObjSharePtr;}

可以看到创建类的时候,类引用计数加13. 并且绑定类的析构函数(OnClassObjDeleteD ,删除对象的时候计娄引用计数减1

template <typename Base>void ClassLoader::OnClassObjDeleter(Base *obj){    if (nullptr == obj)    {        return;    }    std::lock_guard<std::mutex> lck(classobj_ref_count_mutex_);    delete obj;    --classobj_ref_count_;}

我们先简单的分析下 ClassLoaderManager,最后再分析 utility 。

类加载管理器 (ClassLoaderManager)

类加载器管理实际上是管理不同的 classloader ,而不同的libpath 对应不同的 classloader 。ClassLoaderManager 主要的数据结构其实如下:

std::map<std::string, ClassLoader *> libpath_loader_map_;

其中“libpath_loader_map_”为 map 结构,在“LoadLibrary”的时候赋值,key 为 library_path,而 value为 ClassLoader,

bool ClassLoaderManager::LoadLibrary(const std::string &library_path){    std::lock_guard<std::mutex> lck(libpath_loader_map_mutex_);    if (!IsLibraryValid(library_path))    {        libpath_loader_map_[library_path] =            new class_loader::ClassLoader(library_path);    }    return IsLibraryValid(library_path);}

也就是说“ClassLoaderManager”对 ClassLoader 进行保存和管理。最后我们分析下 utility 具体的实现,utility 分为 2 部分,一部分为 ClassFactory, 一部分为工具函数《class_loader_utility.cc)

ClassFactory

可以看到有如下继承关系“ ClassFaetory -> AbstractClassFactory -> AbstractClassFactoryBase ”,其中“ClassFactory“和”AbstractClassFactory ”为模板类,主要的实现在“AbstractClassFactoryBase”中,我们逐个分析:首先是类初始化,指定了“relative_library path”,“base_class_name”,“class_name_”

AbstractClassFactoryBase::AbstractClassFactoryBase(    const std::string &class_name, const std::string &base_class_name)    : relative_library_path_(""),      base_class_name_(base_class_name),      class_name_(class_name) {}

设置 OwnedClassLoader,而“RemoveOwnedClassLoader”同理。

void AbstractClassFactoryBase::AddOwnedClassLoader(ClassLoader *loader){    if (std::find(relative_class_loaders_.begin(), relative_class_loaders_.end(),                  loader) == relative_class_loaders_.end())    {        relative_class_loaders_.emplace_back(loader);    }}

classloader 是否属于该 classFactory 。

bool AbstractClassFactoryBase::IsOwnedBy(const ClassLoader *loader){    std::vector<ClassLoader *>::iterator itr = std::find(                relative_class_loaders_.begin(), relative_class_loaders_.end(), loader);    return itr != relative_class_loaders_.end();}

也是说 ClassFactory 能够生产一个路径下的所有类,一个 ClassFactory 可能有好几个 ClassLoader,分为base_class_name 和 class_name。

接下来我们看“class_loader_utility.cc”的实现,文件中实现了很多函数,这个分析如下;

创建对象(CreateClassObj)的具体实现如下,先找到类对应的 factory,然后通过 factory 创建对象。

template <typename Base>
std::shared_ptr<Base> ClassLoaderManager::CreateClassObj(
    const std::string &class_name)
{
    std::vector<ClassLoader *> class_loaders = GetAllValidClassLoaders();
    for (auto class_loader : class_loaders)
    {
        if (class_loader->IsClassValid<Base>(class_name))
        {
            return (class_loader->CreateClassObj<Base>(class_name));
        }
    }
    AERROR << "Invalid class name: " << class_name;
    return std::shared_ptr<Base>();
}

注册类到 factory 。

template <typename Derived, typename Base>
void RegisterClass(const std::string &class_name,
                   const std::string &base_class_name)
{
    AINFO << "registerclass:" << class_name << "," << base_class_name << ","
          << GetCurLoadingLibraryName();

    utility::AbstractClassFactory<Base> *new_class_factrory_obj =
        new utility::ClassFactory<Derived, Base>(class_name, base_class_name);
    new_class_factrory_obj->AddOwnedClassLoader(GetCurActiveClassLoader());
    new_class_factrory_obj->SetRelativeLibraryPath(GetCurLoadingLibraryName());

    GetClassFactoryMapMapMutex().lock();
    ClassClassFactoryMap &factory_map =
        GetClassFactoryMapByBaseClass(typeid(Base).name());
    factory_map[class_name] = new_class_factrory_obj;
    GetClassFactoryMapMapMutex().unlock();
}

查找 classloader 中所有类的名称。

template <typename Base>std::vector<std::string> GetValidClassNames(ClassLoader *loader){    std::lock_guard<std::recursive_mutex> lck(GetClassFactoryMapMapMutex());    ClassClassFactoryMap &factoryMap =        GetClassFactoryMapByBaseClass(typeid(Base).name());    std::vector<std::string> classes;    for (auto &class_factory : factoryMap)    {        AbstractClassFactoryBase *factory = class_factory.second;        if (factory && factory->IsOwnedBy(loader))        {            classes.emplace_back(class_factory.first);        }    }    return classes;}

加载类,通过指定的 classloader 加载指定路径下的库

bool LoadLibrary(const std::string &library_path, ClassLoader *loader)
{
    // 类是否已经被加载,如果被加载则对应的 class_factory 加上依赖的class_loader
    if (IsLibraryLoadedByAnybody(library_path))
    {
        AINFO << "lib has been loaded by others,only attach to class factory obj."
              << library_path;
        ClassFactoryVector lib_class_factory_objs =
            GetAllClassFactoryObjectsOfLibrary(library_path);
        for (auto &class_factory_obj : lib_class_factory_objs)
        {
            class_factory_obj->AddOwnedClassLoader(loader);
        }
        return true;
    }

    SharedLibraryPtr shared_library = nullptr;
    static std::recursive_mutex loader_mutex;
    {
        std::lock_guard<std::recursive_mutex> lck(loader_mutex);
        try
        {
            //设置当前**的classloader,当前加载库路径
            SetCurActiveClassLoader(loader);
            SetCurLoadingLibraryName(library_path);
            shared_library = SharedLibraryPtr(new SharedLibrary(library_path));
        }
        catch (const LibraryLoadException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "LibraryLoadException: " << e.what();
        }
        catch (const LibraryAlreadyLoadedException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "LibraryAlreadyLoadedException: " << e.what();
        }
        catch (const SymbolNotFoundException &e)
        {
            SetCurLoadingLibraryName("");
            SetCurActiveClassLoader(nullptr);
            AERROR << "SymbolNotFoundException: " << e.what();
        }

        SetCurLoadingLibraryName("");
        SetCurActiveClassLoader(nullptr);
    }

    if (shared_library == nullptr)
    {
        AERROR << "shared library failed: " << library_path;
        return false;
    }

    auto num_lib_objs = GetAllClassFactoryObjectsOfLibrary(library_path).size();
    if (num_lib_objs == 0)
    {
        AWARN << "Class factory objs counts is 0, maybe registerclass failed.";
    }

    std::lock_guard<std::recursive_mutex> lck(GetLibPathSharedLibMutex());
    LibPathSharedLibVector &opened_libraries = GetLibPathSharedLibVector();
	// 保存加载路径和对应的 poco_library
    opened_libraries.emplace_back(
        std::pair<std::string, SharedLibraryPtr>(library_path, shared_library));
    return true;
}

上面我们分析了 classloader 动态的加载并月创建类对象, 而在 mainhoard 中通过动态的加载 module, 并且调用模块的 Initialize 方法,实现模块的初始化。下面我们看下模块的初始化过程。

模块初始化

component 概述(cyber 组件)

我们先看下 component 的目录结构。可以看到 cyber 组件分为 2 类:普通组件和定时组件,而二者都继承至基础组件;

[email protected]:~/study/apollo/cyber/component$ tree.├── BUILD├── component_base.h //基础组件├── component.h //组件├── component_test.cc├── timer_component.cc //定时组件├── timer_component.h└── timer_component_test.cc0 directories, 7 files

基础组件 component_base

我们先看下基础组件中实现了什么,也就是“component_base.h”中实现了什么。“component_base.h”实现了“ComponentBase”类,下面我们逐步分析“ComponentBase”类的 public 方法。”1、Initialize 方法Initialize 方法在派生类中重写了,这里有 2 个 Initialize 方法,分别对应上述所说的 2 种类型的组件。

  virtual bool Initialize(const ComponentConfig& config) { return false; }  virtual bool Initialize(const TimerComponentConfig& config) { return false; }

2、Shutdown 方法

用于关闭 cyber 模块

virtual void Shutdown(){    if (is_shutdown_.exchange(true))    {        return;    }    Clear();    for (auto &reader : readers_)    {        reader->Shutdown();    }    scheduler::Instance()->RemoveTask(node_->Name());}

3、GetProtoConfig 方法

获取 protobuf 格式的配置。

template <typename T>bool GetProtoConfig(T *config) const{    return common::GetProtoFromFile(config_file_path_, config);}

看完公有方法,下面我们看下私有方法.有些简单的方法这里就不详细说了,主要看下“LoadConfigFiies方法,有2 个“LoadConfigFiles” 方法这里只介绍第一个:

void LoadConfigFiles(const ComponentConfig &config){    if (!config.config_file_path().empty())    {        if (config.config_file_path()[0] != '/')        {            config_file_path_ = common::GetAbsolutePath(common::WorkRoot(),                                config.config_file_path());        }        else        {            config_file_path_ = config.config_file_path();        }    }    if (!config.flag_file_path().empty())    {        std::string flag_file_path = config.flag_file_path();        if (flag_file_path[0] != '/')        {            flag_file_path =                common::GetAbsolutePath(common::WorkRoot(), flag_file_path);        }        google::SetCommandLineOption("flagfile", flag_file_path.c_str());    }}

5、私有成员变量

最后我们在分析下私有成员变量,也就是说每个组件(componenb)会自动创建一个节点(node),并且可以挂载多个 reader。

    std::atomic<bool> is_shutdown_ = {false};    std::shared_ptr<Node> node_ = nullptr;    std::string config_file_path_ = "";    std::vector<std::shared_ptr<ReaderBase>> readers_;

下面我们开始分析 component 组件,也就是 Component 类。

Component 组件

Component 类都需要实现“Initialize”和“Process“2 个方法,所以 planning.fouting,perception 等模块都需要实现这 2? 个方法。

template <typename M0>
class Component<M0, NullType, NullType, NullType> : public ComponentBase
{
public:
    Component() {}
    ~Component() override {}
    bool Initialize(const ComponentConfig &config) override;
    bool Process(const std::shared_ptr<M0> &msg);
private:
    virtual bool Proc(const std::shared_ptr<M0> &msg) = 0;
};

我们接着看下这 2 个方法是如何实现的,先看“Process”方法。

1、Process 方法

可以看到 Process 方法比较简单,先判断模块是否关闭,然后执行“Proc”方法。

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Process(
    const std::shared_ptr<M0> &msg0, const std::shared_ptr<M1> &msg1)
{
    if (is_shutdown_.load())
    {
        return true;
    }
    return Proc(msg0, msg1);
}

2、Initialize 方法

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
    const ComponentConfig &config)
{
    // 创建node 节点
    node_.reset(new Node(config.name()));
    //加载配置
    LoadConfigFiles(config);
	// 订阅消息数和 reader 个数要匹配
    if (config.readers_size() < 2)
    {
        AERROR << "Invalid config file: too few readers.";
        return false;
    }
	//初始化,在基类(ComponentBase)中实现
    if (!Init())
    {
        AERROR << "Component Init() failed.";
        return false;
    }

    bool is_reality_mode = GlobalData::Instance()->IsRealityMode();
	//创建 readerl
    ReaderConfig reader_cfg;
    reader_cfg.channel_name = config.readers(1).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
    reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
	
    auto reader1 = node_->template CreateReader<M1>(reader_cfg);
	//创建 reader0
    reader_cfg.channel_name = config.readers(0).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
    reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();

    std::shared_ptr<Reader<M0>> reader0 = nullptr;
    // is _reality_mode 模式则直接创建
    if (cyber_likely(is_reality_mode))
    {
        reader0 = node_->template CreateReader<M0>(reader_cfg);
    }
    else
    {
        // 如果不是则创建回调函数
        std::weak_ptr<Component<M0, M1>> self =
                                          std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

        auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
                            config.readers(1).channel());

        auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
        {
            auto ptr = self.lock();
            if (ptr)
            {
                if (!blocker1->IsPublishedEmpty())
                {
                    auto msg1 = blocker1->GetLatestPublishedPtr();
                    ptr->Process(msg0, msg1);
                }
            }
            else
            {
                AERROR << "Component object has been destroyed.";
            }
        };

        reader0 = node_->template CreateReader<M0>(reader_cfg, func);
    }
    if (reader0 == nullptr || reader1 == nullptr)
    {
        AERROR << "Component create reader failed.";
        return false;
    }
    // 保存 readers
    readers_.push_back(std::move(reader0));
    readers_.push_back(std::move(reader1));

    if (cyber_unlikely(!is_reality_mode))
    {
        return true;
    }

    auto sched = scheduler::Instance();
    std::weak_ptr<Component<M0, M1>> self =
                                      std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
    auto func = [self](const std::shared_ptr<M0> &msg0,
                       const std::shared_ptr<M1> &msg1)
    {
        auto ptr = self.lock();
        if (ptr)
        {
            ptr->Process(msg0, msg1);
        }
        else
        {
            AERROR << "Component object has been destroyed.";
        }
    };

    std::vector<data::VisitorConfig> config_list;
    for (auto &reader : readers_)
    {
        config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
    }
    auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
    // 创建执程类
    croutine::RoutineFactory factory =
        croutine::CreateRoutineFactory<M0, M1>(func, dv);
    return sched->CreateTask(factory, node_->Name());
}

3、component 动态加载

Cyber 主 函数在 “ModuleController:Imit() ”进行模块的加载,具体的加载过程在“ModuleController::LoadModule”中。

bool ModuleController::LoadModule(const DagConfig &dag_config)
{
    const std::string work_root = common::WorkRoot();

    for (auto module_config : dag_config.module_config())
    {	
        // 1. 加载动态库
        std::string load_path;
        if (module_config.module_library().front() == '/')
        {
            load_path = module_config.module_library();
        }
        else
        {
            load_path =
                common::GetAbsolutePath(work_root, module_config.module_library());
        }

        if (!common::PathExists(load_path))
        {
            AERROR << "Path does not exist: " << load_path;
            return false;
        }

        class_loader_manager_.LoadLibrary(load_path);

        for (auto &component : module_config.components())
        {
            const std::string &class_name = component.class_name();
            std::shared_ptr<ComponentBase> base =
                class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
            if (base == nullptr || !base->Initialize(component.config()))
            {
                return false;
            }
            component_list_.emplace_back(std::move(base));
        }

        for (auto &component : module_config.timer_components())
        {
            const std::string &class_name = component.class_name();
            std::shared_ptr<ComponentBase> base =
                class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
            if (base == nullptr || !base->Initialize(component.config()))
            {
                return false;
            }
            component_list_.emplace_back(std::move(base));
        }
    }
    return true;
}

模块首先通过 classloader 加载到内存,然后创建对象,并且调用模块的初始化方法。component 中每个模块都设计为可以动态加载和卸载, 可以实时在线的开启和关闭模块, 实现的方式是通过 classloader 来进行动态的加载动态库。

4、component 初始化

component 一共有 4 个模板类,分别对应接收 0-3 个消息。我们这里主要分析 2 个消息的情况, 其它的可以类推。

template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
    const ComponentConfig &config)
{
    node_.reset(new Node(config.name()));
    LoadConfigFiles(config);

    if (config.readers_size() < 2)
    {
        AERROR << "Invalid config file: too few readers.";
        return false;
    }

    if (!Init())
    {
        AERROR << "Component Init() failed.";
        return false;
    }

    bool is_reality_mode = GlobalData::Instance()->IsRealityMode();

    ReaderConfig reader_cfg;
    reader_cfg.channel_name = config.readers(1).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
    reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();

    auto reader1 = node_->template CreateReader<M1>(reader_cfg);

    reader_cfg.channel_name = config.readers(0).channel();
    reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
    reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();

    std::shared_ptr<Reader<M0>> reader0 = nullptr;
    if (cyber_likely(is_reality_mode))
    {
        reader0 = node_->template CreateReader<M0>(reader_cfg);
    }
    else
    {
        std::weak_ptr<Component<M0, M1>> self =
                                          std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());

        auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(
                            config.readers(1).channel());

        auto func = [self, blocker1](const std::shared_ptr<M0> &msg0)
        {
            auto ptr = self.lock();
            if (ptr)
            {
                if (!blocker1->IsPublishedEmpty())
                {
                    auto msg1 = blocker1->GetLatestPublishedPtr();
                    ptr->Process(msg0, msg1);
                }
            }
            else
            {
                AERROR << "Component object has been destroyed.";
            }
        };

        reader0 = node_->template CreateReader<M0>(reader_cfg, func);
    }
    if (reader0 == nullptr || reader1 == nullptr)
    {
        AERROR << "Component create reader failed.";
        return false;
    }
    readers_.push_back(std::move(reader0));
    readers_.push_back(std::move(reader1));

    if (cyber_unlikely(!is_reality_mode))
    {
        return true;
    }

    auto sched = scheduler::Instance();
    std::weak_ptr<Component<M0, M1>> self =
                                      std::dynamic_pointer_cast<Component<M0, M1>>(shared_from_this());
    auto func = [self](const std::shared_ptr<M0> &msg0,
                       const std::shared_ptr<M1> &msg1)
    {
        auto ptr = self.lock();
        if (ptr)
        {
            ptr->Process(msg0, msg1);
        }
        else
        {
            AERROR << "Component object has been destroyed.";
        }
    };

    std::vector<data::VisitorConfig> config_list;
    for (auto &reader : readers_)
    {
        config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
    }
    auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
    croutine::RoutineFactory factory =
        croutine::CreateRoutineFactory<M0, M1>(func, dv);
    return sched->CreateTask(factory, node_->Name());
}

总结 component 流程

对以上 componet 的流程总结如下:

1、创建node 节点〈1 个 component 只能有 1 个 node 节点,之后用户可以用 node 在 init 中自己创建 reader 或writer) 。

2、调用用户自定义的初始化函数 Init0〈子类的 Init方法

3、创建 reader,订阅几个消息就创建几个 reader。

4、创建回调函数,实际上是执行用户定义算法 Proc()函数

5、创建数据访问器,数据访问器的用途为接收数据融合多个通道的数据) ,唤醒对应的协程执行任务。

6、创建协程任务绑定回调函数,并且绑定数据访问器到对应的协程任务,用于唤醒对应的任务。对 cyber 数据的收发流程有了一个简单的介绍,接下去我们会分别介绍如何创建协程、如何在 scheduler

注册任务并且绑定 Notify。也就是说,为了方便理解,你可以认为数据通过 DataDispatcher 已经分发到了对应的 DataVisitor 中,接下来我们只分析如何从 DataVisitor 中取数据,并且触发对应的协程执行回调任务。

创建协程

创建协程对应上述代码。

    croutine::RoutineFactory factory =        croutine::CreateRoutineFactory<M0>(func, dv);

接下来我们查看下如何创建协程: 协程通过工厂模式方法创建,里面包含一个回调函数和一个 dy 〈数据访问器) 。

template <typename M0, typename F>RoutineFactory CreateRoutineFactory(    F &&f, const std::shared_ptr<data::DataVisitor<M0>> &dv){    RoutineFactory factory;    factory.SetDataVisitor(dv);    factory.create_routine = [ = ]()    {        return [ = ]()        {            std::shared_ptr<M0> msg;            for (;;)            {                CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);                if (dv->TryFetch(msg))                {                    f(msg);                    CRoutine::Yield(RoutineState::READY);                }                else                {                    CRoutine::Yield();                }            }        };    };    return factory;}

上述过程总结如下:

1、工厂中设置 DataVisitor

2、工厂中创建设置协程执行函数,回调包括 3 个步骤 从 DataVisitor 中获取数据,执行回调函数,继续休限。

创建调度任务

创建调度任务是在过程“Component:Initialize”中完成。

sched->CreateTask(factory, node_->Name());

我们接着分析如何在 Scheduler 中创建任务。

bool Scheduler::CreateTask(std::function<void()> &&func,                           const std::string &name,                           std::shared_ptr<DataVisitorBase> visitor){    if (cyber_unlikely(stop_.load()))    {        ADEBUG << "scheduler is stoped, cannot create task!";        return false;    }	// 1. 根据名称创建任务 ID    auto task_id = GlobalData::RegisterTaskName(name);    auto cr = std::make_shared<CRoutine>(func);    cr->set_id(task_id);    cr->set_name(name);    AINFO << "create croutine: " << name;	//2. 分发协程任务    if (!DispatchTask(cr))    {        return false;    }	//3. 注册 Notify 隐醒任务    if (visitor != nullptr)    {        visitor->RegisterNotifyCallback([this, task_id]()        {            if (cyber_unlikely(stop_.load()))            {                return;            }            this->NotifyProcessor(task_id);        });    }    return true;}

TimerComponent

实际上 Component 分为 2 类: 一类是上面介绍的消息驱动的 Component,第二类是定时调用的TimerComponent。定时调度模块没有绑定消息收发,需要用户自己创建 reader 来读取消息, 如果需要读取多个消息,可以创建多个 reader。

bool TimerComponent::Initialize(const TimerComponentConfig &config){    if (!config.has_name() || !config.has_interval())    {        AERROR << "Missing required field in config file.";        return false;    }    //1. 创建node    node_.reset(new Node(config.name()));    LoadConfigFiles(config);    //2. 调用用户自定义初始化函数    if (!Init())    {        return false;    }    std::shared_ptr<TimerComponent> self =        std::dynamic_pointer_cast<TimerComponent>(shared_from_this());	// 3. 创建定时器,定时调用"Proc() "函数    auto func = [self]() { self->Proc(); };    timer_.reset(new Timer(config.interval(), func, false));    timer_->Start();    return true;}

总结一下 TimerComponent 的执行流程如下。
1、创建Node
2、调用用户自定义初始化函数
3、创建定时器,定时调用"Proc()"函数
上述就是 Component 模块的调用流程。为了于清楚消息的调用过程,下面我们分析“DataDispatcher”和
“DataVisitor”

DataVisitor 和DataDispatcher

DataDispather (消息分发器) 发布消息,DataDispather 是一个单例,-所有的数据分发都在数据分发器中进行, DataDispather 会把数据放到对应的缓存中六然后 Notify(通知)对应的协程(实际上这里调用的是 DataVisitor中注册的 Notify ) 去处理消息。 DataVisitor (消息访问器) 是–个辅助的类,一个数据处理过程对应一个DataVisitor,通过在 DataVisitor 中注册 Notify, (唤醒对应的协程,协程执行绑定的回调函数) ,并且注册对应的 Buffer 到 DataDispather, 这样在:DataDispather 的时候会通知对应的 DataVisitor 去唤醒对应的协程。 也就是说 DataDispather (消息分发器) 发布对应的消息到 Datavisitor,DataVisitor (消息访问器) 唤醒对应的协程,协程中执行绑定的数据处理回调函数。

DataVisitor 数据访问器

DataVisitor 继承至 DataVisitorBase 类,先看 DataVisitorBase 类的实现。

class DataVisitorBase{public:    //1. 初始化的时候创建一个 Notifier    DataVisitorBase() : notifier_(new Notifier()) {}	//2. 设置注册回调    void RegisterNotifyCallback(std::function<void()> &&callback)    {        notifier_->callback = callback;    }protected:    DataVisitorBase(const DataVisitorBase &) = delete;    DataVisitorBase &operator=(const DataVisitorBase &) = delete;	// 3. 下一次消息的下标    uint64_t next_msg_index_ = 0;    // 4. DataNotifier 单例    DataNotifier *data_notifier_ = DataNotifier::Instance();    std::shared_ptr<Notifier> notifier_;};

可以看到 DataVisitorBase 创建了一个“Notifier”类,并且提供注册回调的接口。同时还引用了“DataNotifier::Instance()”单例。接下来看“DataVisitor”类的实现。

template <typename M0, typename M1 = NullType, typename M2 = NullType,          typename M3 = NullType>class DataVisitor : public DataVisitorBase{public:    explicit DataVisitor(const std::vector<VisitorConfig> &configs)        : buffer_m0_(configs[0].channel_id,                     new BufferType<M0>(configs[0].queue_size)),          buffer_m1_(configs[1].channel_id,                     new BufferType<M1>(configs[1].queue_size)),          buffer_m2_(configs[2].channel_id,                     new BufferType<M2>(configs[2].queue_size)),          buffer_m3_(configs[3].channel_id,                     new BufferType<M3>(configs[3].queue_size))    {        //在DataDispatcher 中增加 ChannelBuffer        DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);        DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);        DataDispatcher<M2>::Instance()->AddBuffer(buffer_m2_);        DataDispatcher<M3>::Instance()->AddBuffer(buffer_m3_);         //2. 在DataNotifier::Instance()中增加创建好的 Notifier        data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);        //3. 对接收到的消息进行数据融合        data_fusion_ = new fusion::AllLatest<M0, M1, M2, M3>(            buffer_m0_, buffer_m1_, buffer_m2_, buffer_m3_);    }    ~DataVisitor()    {        if (data_fusion_)        {            delete data_fusion_;            data_fusion_ = nullptr;        }    }    bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1,    // NOLINT                  std::shared_ptr<M2> &m2, std::shared_ptr<M3> &m3)    // NOLINT    {        //4: 获取融合数据        if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2, m3))        {            next_msg_index_++;            return true;        }        return false;    }private:    fusion::DataFusion<M0, M1, M2, M3> *data_fusion_ = nullptr;    ChannelBuffer<M0> buffer_m0_;    ChannelBuffer<M1> buffer_m1_;    ChannelBuffer<M2> buffer_m2_;    ChannelBuffer<M3> buffer_m3_;};

总结一下 DataVisitor 中实现的功能。
1、在 DataDispatcher 中添加订阅的 ChannelBuffer
2、在 DataNotifier 中增加对应通道的 Notifier
3、通过 DataVisitor 获取数据并进行融合
这里注意:

  1. 、如果 DataVisitor 只访问一个消息,则不会对消息进行融合,如果 DataVisitor 访问 2 个以上的数据,
    那么需要进行融合,并且注册融合回调。之后 CacheBuffer 中会调用融合回调进行数据处理,而不会把数据放
    入 CacheBuffer中。
// 1. 只有一个消息的时候直接从 Buffer 中获取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1,  // NOLINT              std::shared_ptr<M2> &m2)                           // NOLINT{    if (data_fusion_->Fusion(&next_msg_index_, m0, m1, m2))    {        next_msg_index_++;        return true;    }    return false;}// 2. 当有 2 个消息的时候,从融合 buffer 中读取消息bool TryFetch(std::shared_ptr<M0> &m0, std::shared_ptr<M1> &m1)    // NOLINT{    if (data_fusion_->Fusion(&next_msg_index_, m0, m1))    {        next_msg_index_++;        return true;    }    return false;}
  1. 实际上如果有多个消息的时候,会以第 1 个消息为基准,然后把其它消息的最新消息一起放入融合好的 buffer_fusion_ 。
AllLatest(const ChannelBuffer<M0> &buffer_0,          const ChannelBuffer<M1> &buffer_1,          const ChannelBuffer<M2> &buffer_2,          const ChannelBuffer<M3> &buffer_3)    : buffer_m0_(buffer_0),      buffer_m1_(buffer_1),      buffer_m2_(buffer_2),      buffer_m3_(buffer_3),      buffer_fusion_(buffer_m0_.channel_id(),                     new CacheBuffer<std::shared_ptr<FusionDataType>>(                         buffer_0.Buffer()->Capacity() - uint64_t(1))){    buffer_m0_.Buffer()->SetFusionCallback(        [this](const std::shared_ptr<M0> &m0)    {        std::shared_ptr<M1> m1;        std::shared_ptr<M2> m2;        std::shared_ptr<M3> m3;        if (!buffer_m1_.Latest(m1) || !buffer_m2_.Latest(m2) ||                !buffer_m3_.Latest(m3))        {            return;        }        auto data = std::make_shared<FusionDataType>(m0, m1, m2, m3);        std::lock_guard<std::mutex> lg(buffer_fusion_.Buffer()->Mutex());        buffer_fusion_.Buffer()->Fill(data);    });}
  1. DataFusion 类是一个虚类,定义了数据融合的接口“Fusion0”; Apollo 里只提供了一种数据融合的方式,即以第一个消息的时间为基准,取其它最新的消息,当然也可以在这里实现共它的数据融台方式。

DataDispatcher 数据分发器

接下来我们看 DataDispatcher 的实现。

template <typename T>class DataDispatcher{public:    using BufferVector =        std::vector<std::weak_ptr<CacheBuffer<std::shared_ptr<T>>>>;    ~DataDispatcher() {}	//1. 添加 ChannelBuffer 到buffers_map    void AddBuffer(const ChannelBuffer<T> &channel_buffer);	// 2. 分发通道中的消息    bool Dispatch(const uint64_t channel_id, const std::shared_ptr<T> &msg);private:    //3. DataNotifier 单例    DataNotifier *notifier_ = DataNotifier::Instance();    std::mutex buffers_map_mutex_;    // 4 险希表,key 为通道 id,value 为订阅通道消息的 CacheBuffer 数组。    AtomicHashMap<uint64_t, BufferVector> buffers_map_;	// 5. 单例    DECLARE_SINGLETON(DataDispatcher)};

总结一下 DataDispatcher 的实现。
1、添加 ChannelBuffer 到 buffers_map_,key 为通道 id (topic) ,value 为订阅通道消息的 CacheBuffer 数组。
2、分发通道中的消息。根据通道id,把消息放入对应的 CacheBuffer。然后通过 DataNotifier:Instance0通知对应的通道。如果一个通道(topic)有 3 个 CacheBuffer订阅,那么每次都会往这 3 个 CacheBuffer 中写入当前消息的指针。因为消息是共享的,消息访问的时候需要加锁。那么 DataNotifier 如何通知对应的 Channel 的呢? 理解清楚了 DataNotifier 的数据结构,那么也就理解了DataNotifier 的原理。

class DataNotifier{public:    using NotifyVector = std::vector<std::shared_ptr<Notifier>>;    ~DataNotifier() {}    void AddNotifier(uint64_t channel_id,                     const std::shared_ptr<Notifier> &notifier);    bool Notify(const uint64_t channel_id);private:    std::mutex notifies_map_mutex_;    //1. 险希表,key 为通道 id,value 为 Notify 数组    AtomicHashMap<uint64_t, NotifyVector> notifies_map_;    DECLARE_SINGLETON(DataNotifier)};

DataNotifier 中包含一个哈希表,, 表的 key 为通道 it,表的值为 Notify 数组,每个 DataVisitorBase 在初始化的时候会创建一个 Notify 。接着我们看 下CacheBuffer 的实现,CacheBuffer 实际上实现了一个缓存队列,主要关注下 Fill 函数。

void Fill(const T &value){    //1., 融合回调    if (fusion_callback_)    {        fusion_callback_(value);    }    else    {        //2. 如果 Buffer 满,实现循环队列        if (Full())        {            buffer_[GetIndex(head_)] = value;            ++head_;            ++tail_;        }        else        {            buffer_[GetIndex(tail_ + 1)] = value;            ++tail_;        }    }}

ChannelBuffer 是 CacheBuffer 的封装,主要看下获取值。

template <typename T>bool ChannelBuffer<T>::Fetch(uint64_t *index,                             std::shared_ptr<T> &m)    // NOLINT{    std::lock_guard<std::mutex> lock(buffer_->Mutex());    if (buffer_->Empty())    {        return false;    }    if (*index == 0)    {        *index = buffer_->Tail();    }    else if (*index == buffer_->Tail() + 1)    {        return false;    }    else if (*index < buffer_->Head())    {        auto interval = buffer_->Tail() - *index;        AWARN << "channel[" << GlobalData::GetChannelById(channel_id_) << "] "              << "read buffer overflow, drop_message[" << interval << "] pre_index["              << *index << "] current_index[" << buffer_->Tail() << "] ";        *index = buffer_->Tail();    }    m = buffer_->at(*index);    return true;}

data 目录总结

通过上述的分析,实际上数据的访问都是通过“DataVisitor”来实现,数据的分发通过“DataDispatcher”来实现。reader 中也是通过 DataVisitor 来访问数据,在 reader 中订阅对应的 DataDispatcher。也就是说如果你要订阅一个通道,首先是在 reader 中注册消息的 topic,绑定 DataDispatcher,之后对应通道的消息到来之后,触发 DataDispatcher 分发消息,而 DataDispatcher 通过 DataVisitor 中的 Notify 唤醒协程,从 DataVisitor 中获取消息,并执行协程中绑定的回调函数,以上就是整个消息的收发过程。疑问:Reader 中还拷贝了一份数据到 Blocker 中,实际上数据的处理过程并不需要缓存数据,参考“Planning”模块中的实现都是在回调函数中把数据拷贝到指针中。看注释是说 Blocker 是用来仿真的? 后面需要确实下。以下是 Planning 模块中回调函数中拷贝数据的实现。

相关标签: apollo 自动驾驶