前言
计算框架是自动驾驶系统中的重中之重,也是整个系统得以高效稳定运行的基础。为了实时地完成感知、决策和执行,系统需要一系列的模块相互紧密配合,高效地执行任务流。由于各种原因,这些模块可能位于不同进程,也可能位于不同机器。这就要求计算框架中具有灵活的、高性能的通信机制。我们知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS。之前写过两篇相关的文章介绍了其中的调度部分:《自动驾驶平台Apollo 3.5阅读手记:Cyber RT中的协程(Coroutine)》和《自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的任务调度》。今天就来聊一下其中的另一重要部分-通信系统。
和ROS & ROS2中类似,Cyber RT中支持两种数据交换模式:一种是Publish-Subscribe模式,常用于数据流处理中节点间通信。即发布者(Publisher)在channel(ROS中对应地称为topic)上发布消息,订阅该channel的订阅者(Subscriber)便会收到消息数据;另一种就是常见的Service-Client模式,常用于客户端与服务端的请求与响应。本质上它是可以基于前者实现的。Node
是整个数据拓扑网络中的基本单元。一个Node
中可以创建多个读者/写者,服务端/客户端。读者和写者分别对应Reader
和Writer
,用于Publish-Subscribe模式。服务端和客户端分别对应Service
和Client
,用于Service-Client模式。
实现解析
自动驾驶系统中的各个处理模块基本都是实现为Component
。一个Component
中包含一个Node
,另外会根据需要创建和管理Writer
,Reader
,Service
和Client
。这些用于通信的类下面基于Trasmitter
和Receiver
类。前者用于数据发送,后者用于数据接收。它们是数据传输层的抽象,之下可有多个传输层实现用于不同场景下的传输。如对于Trasmitter
有IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。对于Receiver
也是类似的。其中RTPS后端基于Fast RTPS。Fast RTPS是DDS(Data Distribution Service)标准的一个非常流行的开源实现。DDS标准提供了一个平台无关的数据模型,主要用于实时分布式系统。不同的实现可以相互通信。整个通信系统的架构层次图如下。
下面我们就从几个方面深入地看下它们的实现机制。
服务发现与拓扑管理
首先来看下比较基础与核心的服务发现与拓扑管理。其实现主要在目录cyber/service_discovery/
下。节点间通过读和写端建立数据通路。以channel为边,这样可以得到一个数据流图络。由于节点可能退出,订阅情况也可能发生改变,所以这个网络是动态的。因此需要对网络拓扑进行监控。
主要负责这件事的数据结构是TopologyManager
,它是个单例,因为每个进程只要有一个来负责监控网络拓扑就可以了。TopologyManager
有三个子管理器,并有共同的基类Manager
。它们分别为:
- NodeManager
用于管理网络拓扑中的节点。
- ChannelManager
用于管理channel,即网络拓扑中的边。
- ServiceManager
用于管理Service
和Client
。
Cyber RT中有两个层面的拓扑变化的监控:
- 基于Fast RTPS的发现机制
它主要监视网络中是否有参与者加入或退出。TopologyManager::CreateParticipant()
函数创建transport::Participant
对象时会输入包含host name与process id的名称。ParticipantListener
用于监听网络的变化。网络拓扑发生变化时,Fast RTPS传上来ParticipantDiscoveryInfo
,在TopologyManager::Convert()
函数中对该信息转换成Cyber RT中的数据结构ChangeMsg
。然后调用回调函数TopologyManager::OnParticipantChange()
,它会调用其它几个子管理器的OnTopoModuleLeave()
函数。然后子管理器中便可以将相应维护的信息进行更新(如NodeManager
中将相应的节点删除)。
这层拓扑监控主要是通过Fast RTPS提供的自动发现机制。如进程意外退出,则要将各管理中相应信息进行更新。它的优点是如果进程出错或设备断开也可以工作,但粒度比较粗,且不是非常及时(比如断开时)。
- 基于主动式的拓扑变更广播
这一部分主要在TopologyManager::Init()
函数中创建和初始化。在初始化时,会调用它们的StartDiscovery()
函数开始启动自动发现机制。基于TopologyManager
中的RtpsParticipant
对象,这几个子管理会通过CreateSubscriber()
和CreatePublisher()
函数创建相应的subscriber和publisher。子管理器中channel名称分别为node_change_broadcast
,channel_change_broadcast
和service_change_broadcast
。Subscriber的回调函数为Manager::OnRemoteChange()
。该回调函数中会解析拓扑变更消息并调用Dispose()
函数进行处理。
这层拓扑监控是主动式的,即需要相应的地方主动调用Join()
或Leave()
来触发,然后各子管理器中回调函数进行信息的更新。如NodeChannelImpl
创建时会调用NodeManager::Join()
。Reader
和Writer
初始化时会调用JoinTheTopolicy()
函数,继而调用ChannelManager::Join()
函数。相应地,有LeaveTheTopology()
函数表示退出拓扑网络。在这两个函数中,会调用Dispose()
函数,而这个函数是虚函数,在各子管理器中有各自的实现。另外Manager
提供AddChangeListener()
函数注册当拓扑发生变化时的回调函数。举例来说,Reader::JoinTheTopology()
函数中会通过该函数注册回调Reader::OnChannelChange()
。
数据传输
在一个分布式计算系统中,根据两个节点间的位置关系需要使用不同的传输方式(定义在CommunicationMode
中):
- INTRA:如果是同进程的,因为在同一地址空间,直接传指针就完了。
- SHM(Shared memory):如果是同一机器上,但跨进程的,为了高效可以使用共享内存。
- RTPS:如果是跨设备的,那就老老实实通过网络传吧。
示意图如下:
很多时候一个计算图中各种情况都有,所以为了达到最好的性能,需要混合使用。这种混合模式称为HYBRID模式。框架需要根据节点间关系选择合适的传输后端。
每个Writer
有Transmitter
,每个Reader
有Receiver
。它们是负责消息发送与收取的类。Transmitter
与Receiver
的基类为Endpoint
,代表一个通信的端点,它主要的信息是身份标识与属性。其类型为RoleAttributes
(定义在role_attributes.proto
)的成员attr_
包含了host name,process id和一个根据uuid产生的hash值作为id。通过它们就可以判断节点之间的相对位置关系了。
Reader
和Writer
会调用Transport
的方法CreateTransmitter()
和CreateReceiver()
用于创建发送端的transmitter和接收端的receiver。创建时有四种模式可选,分别是INTRA,SHM和RTPS,和HYBRID。最后一种是前三种的混合模式,也是默认的模式。如Transmitter
对应的继承类为IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。这几个继承类最主要实现了Transmit()
函数用于传输数据。对于Receiver
来说是类似的,它有4个继承类对应四种传输方式,即IntraReceiver
,ShmReceiver
,RtpsReceiver
和HybridReceiver
。
结合前面提到的几种模式对应的场景,transmitter与receiver的对应关系如下:
前面提到,传输层实现主要有四个实现后端,对应四种模式:
-
RTPS:RTPS部分基于eProsimar的Fast RTPS。
RtpsTransmitter
类中创建和封装publisher。Transmit()
函数将消息序列化成Fast RTP中的格式UnderlayMessage
,然后通过publisher发出去。RtpsReceiver
中的dispatcher_
成员指向单例RtpsDispatcher
。它用于派发RTPS发来的数据,维护了channel id到subscriber的查找表。RtpsDispatcher::AddSubscriber()
函数使用eprosima::fastrtps::Domain::createSubscriber()
函数创建subscriber,其回调统一为RtpsDispatcher::OnMessage()
函数。该函数会将从RTPS通路来的消息进行派发。 -
SHM:
Segment
类表示一块对应一个channel的共享内存,由SegmentFactory::CreateSegment
函数创建。它有两个继承类PosixSegment
和XsiSegment
,是平台相关的实现。在写端,ShmTransmitter::Transmit()
函数用于发送消息,该函数先通过AcquireBlockToWrite()
函数拿一个可写的block。如果发现该Segment
尚未初始化,会调用OpenOrCreate()
通过OS的接口创建共享内存并且map出虚拟地址。这块共享内存区域大体分两部分。一部分为元信息,另一部分为消息数据。后者会被切分为相同大小的block。block的buffer大小默认16K,但遇上消息超出大小的时候会调整。拿到该block后,将消息序列化后写入,并通知读者来取消息。通知机制是通过NotifierBase
实现的。它有两个实现类,分别为ConditionNotifier
和MulticastNotifier
。前者为默认设置。它会单独开一块共享共享专门用于通知,其中包含了ReadableInfo
等信息。MulticastNotifier
的主要区别是它是通过指定的socket广播。在读端,ShmDispatcher::Init()
初始化时会创建专门的线程,线程的执行体为ShmDispatcher::Threadfunc()
函数。它在循环体内会通过Listen()
函数等待新消息。如果有新消息写入后发出通知,这儿就会往下走。基于通知中的ReadableInfo
信息,得到channel id,block index等信息,然后调用ReadMessage()
函数读消息并反序列化。之后调用ShmDispatcher::OnMessage()
函数进行消息派发。 -
INTRA :用于进程内通信。由于读者和写者是在同一进程内,因此可以直接调用。在
IntraTransmitter::Transmit()
函数中,会直接调用读端的IntraDispatcher::OnMessage()
。该函数进行下一步消息的派发。 -
HYBRID:即默认模式,是前三种的结合体。具体功能其实还是交给前面几个后端完成的,只是它会根据读者与写者的关系使用相应的后端。
消息写端
写端的实现相对简单一些。在模块组件中,可以通过CreateWriter()
函数创建Writer
对象,然后就可以通过该对象向指定channel发送消息了。以CameraComponent
为例:
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());
pb_image->set_width(raw_image_->width);
pb_image->set_height(raw_image_->height);
pb_image->mutable_data()->reserve(raw_image_->image_size);
...
writer_->Write(pb_image);
这里先创建了Writer
对象,然后填好了消息里的数据(这里发送的消息类型为Image
,定义在modules/drivers/proto/sensor_image.proto
文件),最后调用Writer::Write()
函数将该消息发出。
CreateWriter()
函数中先创建Writer
对象,再调用Writer::Init()
函数进行初始化。初始化中主要通过CreateTransmitter()
函数创建Transmitter
对象。因为默认是HYBRID模式,所以这里实际创建的是HybridTransmitter
对象。Transmitter
继承自Endpoint
类,它其中的属性信息以用来判断读者与写者的相对关系。不同的相对关系决定使用何种Transmitter
对象。其配置在InitMode()
函数中设置:
template <typename M>
void HybridTransmitter<M>::InitMode() {
mode_ = std::make_shared<proto::CommunicationMode>();
mapping_table_[SAME_PROC] = mode_->same_proc();
mapping_table_[DIFF_PROC] = mode_->diff_proc();
mapping_table_[DIFF_HOST] = mode_->diff_host();
}
Writer
对象的初始化中还会将调用JointTheTopology()
函数将之加入到ChannelManager
维护的拓扑信息中。
template <typename MessageT>
void Writer<MessageT>::JoinTheTopology() {
// add listener
change_conn_ = channel_manager_->AddChangeListener(std::bind(
&Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));
// get peer readers
const std::string& channel_name = this->role_attr_.channel_name();
std::vector<proto::RoleAttributes> readers;
channel_manager_->GetReadersOfChannel(channel_name, &readers);
for (auto& reader : readers) {
transmitter_->Enable(reader);
}
channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,
message::HasSerializer<MessageT>::value);
}
这里还会做一件比较重要的事是enable相应的Transmitter
。先通过ChannelManager
得到该channel相应读者的信息。然后对于每个读者,调用HybridTransmitter::Enable()
函数。HybridTransmitter
是混合模式的Transmitter
,它其实包含了RTPS,SHM和INTRA三种Transmitter
实例。但这三种Transmitter
并不一定都需要用到。比如,如果该消息对应的读者全是同进程的,那就没必要整上SHM和RTPS了。HybridTransmitter::Enable()
函数会根据参数来enable合适的Transmitter
。
template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {
auto relation = GetRelation(opposite_attr);
if (relation == NO_RELATION) {
return;
}
uint64_t id = opposite_attr.id();
std::lock_guard<std::mutex> lock(mutex_);
receivers_[mapping_table_[relation]].insert(id);
transmitters_[mapping_table_[relation]]->Enable();
TransmitHistoryMsg(opposite_attr);
}
相应地,在Disable()
函数中决定是否要disable相应的Transmitter
。这样在之后的Transmit()
函数中只要把transmitters_
中的所有Transmitter
拿出来调用Transmit()
函数即可。
发送数据是通过Writer::Write()
函数继而调用Transmitter::Transmit()
函数来实现的。因为这里是用的HybridTransmitter
,因此实际调用的是HybridTransmitter::Transmit()
函数:
template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
const MessageInfo& msg_info) {
std::lock_guard<std::mutex> lock(mutex_);
history_->Add(msg, msg_info);
for (auto& item : transmitters_) {
item.second->Transmit(msg, msg_info);
}
return true;
}
可以看到这里分别调用三大Transmitter
的Transmit()
函数发送消息。
消息读端
读端的处理链路相比下复杂一些。先回顾一个Component
中对消息的处理。对于一个Component
来说,它可能会从多个channel收取消息,然后基于所有channel的消息才能处理。第一个channel暂且称之为主channel。这些channel消息的组合我们暂且称为组合消息。我们就来看下典型的两个channel情况,其初始化的主要代码为:
template <typename M0, typename M1>
bool Component<M0, M1, NullType, NullType>::Initialize(
...
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(1).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());
reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();
auto reader1 = node_->template CreateReader<M1>(reader_cfg);
reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();
reader0 = node_->template CreateReader<M0>(reader_cfg);
...
readers_.push_back(std::move(reader0));
readers_.push_back(std::move(reader1));
...
std::vector<data::VisitorConfig> config_list;
for (auto& reader : readers_) {
config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());
}
auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<M0, M1>(func, dv);
return sched->CreateTask(factory, node_->Name());
}
其中对两个channel分别创建Reader
对象。该Reader
对象是针对单个channel的。然后针对所有channel创建DataVisitor
对象,这时就是针对所有channel的组合消息了。最后创建协程来进行组合数据的处理。后面会看到每个Reader
都会有单独的协程来做数据读取。因此,对于一个有n个channel的component,框架会为此创建至少n+1个协程。
其中比较重要的结构就是用于读取消息的Reader
类了。我们先看Reader
对象的创建。其初始化函数Init()
如下:
template <typename MessageT>
bool Reader<MessageT>::Init() {
if (init_.exchange(true)) {
return true;
}
std::function<void(const std::shared_ptr<MessageT>&)> func;
if (reader_func_ != nullptr) {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
} else {
func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); };
}
auto sched = scheduler::Instance();
croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();
auto dv = std::make_shared<data::DataVisitor<MessageT>>(
role_attr_.channel_id(), pending_queue_size_);
// Using factory to wrap templates.
croutine::RoutineFactory factory =
croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);
if (!sched->CreateTask(factory, croutine_name_)) {
AERROR << "Create Task Failed!";
init_.store(false);
return false;
}
receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);
this->role_attr_.set_id(receiver_->id().HashValue());
channel_manager_ =
service_discovery::TopologyManager::Instance()->channel_manager();
JoinTheTopology();
return true;
}
这里主要创建了相应的DataVisitor
类,协程和Receiver
类等。其中DataVisitor
主要用于消息数据的访问。它存放到来的消息数据,并提供接口供消息读取。还是以两个channel的情况为例:
template <typename M0, typename M1>
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {
public:
explicit DataVisitor(const std::vector<VisitorConfig>& configs)
: buffer_m0_(configs[0].channel_id,
new BufferType<M0>(configs[0].queue_size)),
buffer_m1_(configs[1].channel_id,
new BufferType<M1>(configs[1].queue_size)) {
DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);
DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);
data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);
data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);
}
...
bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT
if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {
next_msg_index_++;
return true;
}
return false;
}
private:
fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;
ChannelBuffer<M0> buffer_m0_;
ChannelBuffer<M1> buffer_m1_;
};
它的成员变量中对每一个channel都有一个对应的ChannelBuffer
对象。DataDispatcher::AddBuffer()
函数在DataVisitor
初始化时用来将这些个ChannelBuffer
加入到DataDispatcher
的管理中。同时,DataNotifier::AddNotifier()
函数用来以主channel的id为键值加入到DataNotifier
的管理中。DataDispatcher
与DataNotifier
均为单例。前者为模板类,意味着每一个消息类型会有对应的DataDispatcher
对象,且相同消息类型会共享该对象。顾名思义,它主要用于数据传输层有数据来时的分发,即当新消息到来时通过DataDispatcher::Dispatch()
函数把它放到相应的消息缓冲区中。后者用于管理所有的Notifier
。它用于在消息派发完后唤醒相应的协程进行处理。这些对象的大体结构图如下:
当channel多于一个时(组合消息),DataVisitor
中还有一个DataFusion
对象用于将多路channel的数据合并。DataFusion
的实现类为AllLatest
,听名字就知道它会取所有channel中的最新值。除了per-channel的ChannelBuffer
对象外,它还有一个特殊的ChannelBuffer
对象用于存放多channel消息的组合消息(即各个channel的消息类型的tuple)。当填入主channel的消息时,会调用由SetFusionCallback()
函数注册的回调。该回调判断是否所有channel都有消息,如果都有消息的话就将这些消息作为组合消息填入该组合消息的ChannelBuffer
中。 在协程处理函数中会调用DataVisitor::TryFetch()
函数从该ChannelBuffer
中拿组合消息。值得注意的是这件事只在主channel有消息来时才会被触发,因此主channel的选取是有讲究的。
Reader
初始化时创建的另一个关键对象为Receiver
。它有4个继承类,默认为混合模式的HybridReceiver
。HybridReceiver::InitReceivers
中分别创建相应的IntraReceiver
、ShmReceiver
和RtpsReceiver
,放在成员receivers_
数组中。它会来根据写端的情况来enable和disable相应的Receiver
。ReceiverManager
用于管理这些Receiver
对象。它以channel为key进行管理,因此同一进程内订阅同一个channel的会共用同一个Receiver
对象。ReceiverManager::GetReceiver()
函数用于按键值取出Receiver
,如没有,则通过Transport::CreateReceiver()
函数新建一个Receiver
。 这些个Receiver
在Enable()
函数中会通过AddListener()
函数向对应的Dispatcher
注册其回调函数XXXReceiver::OnNewMessage()
。Dispatcher
类中的成员msg_listeners_
是channel id到ListenerHandler
对象的查找表。ListenerHandler
通过signal/slot机制保存了所有这些回调。注意不同传输后端的AddListener()
实现略有不同。比如RtpsDispatcher::AddListener()
函数中会将输入的消息先通过ParseFromString()
函数进行解析,然后调用传入的回调。ShmDispatcher::AddListener()
函数也是类似,它会先通过ParseFromArray()
函数解析消息。而对于IntraDispatcher::AddListener()
,由于是同个进程内,是以消息本身的类型传的,就没必要解析了。
这些相关结构关系示意图如下:
看了一些关键相关数据结构,接下来看下读端的处理流程。首先,如之前介绍的,各Dispatcher
的继承类各显神通使自己的OnMessage()
回调函数被调用。以RtpsDispatcher
为例:
void RtpsDispatcher::OnMessage(uint64_t channel_id,
const std::shared_ptr<std::string>& msg_str,
const MessageInfo& msg_info) {
if (is_shutdown_.load()) {
return;
}
ListenerHandlerBasePtr* handler_base = nullptr;
if (msg_listeners_.Get(channel_id, &handler_base)) {
auto handler =
std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
handler->Run(msg_str, msg_info);
}
}
这里ListenerHandler::Run()
会根据消息的发送者信息找到对应的回调,即Receiver::OnNewMessage()
。
template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
const MessageInfo& msg_info) {
if (msg_listener_ != nullptr) {
msg_listener_(msg, msg_info, attr_);
}
}
这里的回调函数msg_listener_
是在Receiver
创建的时候传入的。其实主要是调用了DataDispatcher::Dispatch()
函数来消息的派发:
transport::Transport::Instance()->CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});
DataDisaptcher
是模板类单例,即对于一种特定类型的消息可以共用一个DataDispatcher
。之前在DataVisitor
初始化时会通过AddBuffer()
函数将ChannelBuffer
加入到DataDispatcher
的成员buffers_map_
中。它是一个以channel id为key的map,其value为所有等待该channel上消息的CacheBuffer
的数组。也就是说,消息分发时,只需要根据channel id找到这些buffer,然后将新来的消息填入其中即可。这就是Dispatcher::Dispatch()
函数主要做的事:
template <typename T>
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,
const std::shared_ptr<T>& msg) {
BufferVector* buffers = nullptr;
if (apollo::cyber::IsShutdown()) {
return false;
}
if (buffers_map_.Get(channel_id, &buffers)) {
for (auto& buffer_wptr : *buffers) {
if (auto buffer = buffer_wptr.lock()) {
std::lock_guard<std::mutex> lock(buffer->Mutex());
buffer->Fill(msg);
}
}
} else {
return false;
}
return notifier_->Notify(channel_id);
}
最后调用DataNotifier::Notify()
函数来通知新消息的到来。它会触发该channel上所有对应Notifier
中的回调。
inline bool DataNotifier::Notify(const uint64_t channel_id) {
NotifyVector* notifies = nullptr;
if (notifies_map_.Get(channel_id, ¬ifies)) {
for (auto& notifier : *notifies) {
if (notifier && notifier->callback) {
notifier->callback();
}
}
return true;
}
return false;
}
这个Notifier
中的回调是在创建协程时通过RegisterNotifyCallback()
函数注册进去的,目的是为了唤醒相应的协程来处理该新消息。
visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load())) {
return;
}
this->NotifyProcessor(task_id);
});
NotifyProcessor()
函数会修改对应协程的状态使之能被调度执行。前面提到,对于n个channel输入的component,会有n+1个协程。它们都是以DataVisitor
和消息回调函数一起作为参数创建的。这个协程主体中会调用DataVisitor::TryFetch()
函数拿消息,然后调用注册的消息处理函数:
factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg;
for (;;) {
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg)) {
f(msg);
CRoutine::Yield(RoutineState::READY);
} else {
CRoutine::Yield();
}
}
};
};
对于那n个消息读取协程来说,其消息处理函数为:
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
这个回调函数中会调用Reader::Enqueue()
函数。在该函数中,主要调用Blocker::Publish()
函数,它继而调用Blocker::Enqueue()
和Blocker::Notify()
函数。Blocker
类是一个存储消息的结构。BlockerManager
类用于管理Blocker
,其中维护了以channel为键值的Blocker
的map。Reader::Enqueue()
函数将消息放到Blocker
的成员published_msg_queue_
队列中。之后,可以通过Blocker::Observe()
函数将成员published_msg_queue_
队列的消息放到成员observed_msg_queue_
队列,然后通过Blocker::GetLatestObserved()
函数得到最新的消息。比如ControlComponent
中的:
chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();
而对于剩下那一个协程,它是由主channel来触发的。因它处理的是多channel的组合消息,在协程主体中的TryFetch()
函数会调用AllLatest::Fusion()
函数同时拿多个channel上的最新消息。至于这个组合消息是怎么填入的前面有提。简单来说,对于它来说,主channel来消息时,同时也会将其它channel的消息写入组合消息。然后调度协程,拿出组合消息进行处理。其消息处理函数为:
auto func = [self](const std::shared_ptr<M0>& msg0,
const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else {
AERROR << "Component object has been destroyed.";
}
};
该实现中主要以收到的消息为参数调用Component
中的处理函数Process()
,从而执行组件的自定义处理逻辑。
小结
文中提了不少细枝末节,最后非常high-level地概括下从写者到读者的流程。写者Writer
写消息时,会通过HybridTransmitter
继而使用合适后端的Transmitter
发送消息。根据读与写者间的位置关系,经过网络、共享内存或直接调用的方式,对应后端的Dispatcher
收到消息。收到后转成指定消息类型,交给Receiver
。然后通过DataDispatcher
派发消息。派发消息就是将消息放到对应的buffer中,然后通知相应的协程来作进一步处理。上层模块要取用这些消息,主要两种方式:一种是通过Component
的Proc()
接口,它被调用时参数就是最新的消息。另一种是通过Reader
的Observe()
函数直接拿。
我们知道,Apollo在版本3.5前是基于ROS的,同时也对ROS做了几个重要改进。这些改进不少是关于通信系统的,如共享内存、去中心化和数据兼容性。到Cyber RT的演进也自然延续了这几个优点。总得来说,Cyber RT基于自动发现机制与Publish-Subscribe模式实现了通信网络的拓扑管理。同时它对数据传输层做了抽象,下面实现多个后端分别适用于不同场景,并提供了HYBRID模式可以根据读者和写者间的关系自动使用合适的传输层后端。这样,通信系统的复杂性就被很好地屏蔽,框架就能提供给应用层便利的开发接口。