前言
计算框架是自动驾驶系统中的重中之重,也是整个系统得以高效稳定运行的基础。为了实时地完成感知、决策和执行,系统须要一系列的模块相互紧密配合,高效地执行任务流。因为各类缘由,这些模块可能位于不一样进程,也可能位于不一样机器。这就要求计算框架中具备灵活的、高性能的通讯机制。咱们知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS。以前写过两篇相关的文章介绍了其中的调度部分:《自动驾驶平台Apollo 3.5阅读手记:Cyber RT中的协程(Coroutine)》和《自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的任务调度》。今天就来聊一下其中的另外一重要部分-通讯系统。node
和ROS & ROS2中相似,Cyber RT中支持两种数据交换模式:一种是Publish-Subscribe模式,经常使用于数据流处理中节点间通讯。即发布者(Publisher)在channel(ROS中对应地称为topic)上发布消息,订阅该channel的订阅者(Subscriber)便会收到消息数据;另外一种就是常见的Service-Client模式,经常使用于客户端与服务端的请求与响应。本质上它是能够基于前者实现的。Node
是整个数据拓扑网络中的基本单元。一个Node
中能够建立多个读者/写者,服务端/客户端。读者和写者分别对应Reader
和Writer
,用于Publish-Subscribe模式。服务端和客户端分别对应Service
和Client
,用于Service-Client模式。后端
实现解析
自动驾驶系统中的各个处理模块基本都是实现为Component
。一个Component
中包含一个Node
,另外会根据须要建立和管理Writer
,Reader
,Service
和Client
。这些用于通讯的类下面基于Trasmitter
和Receiver
类。前者用于数据发送,后者用于数据接收。它们是数据传输层的抽象,之下可有多个传输层实现用于不一样场景下的传输。如对于Trasmitter
有IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。对于Receiver
也是相似的。其中RTPS后端基于Fast RTPS。Fast RTPS是DDS(Data Distribution Service)标准的一个很是流行的开源实现。DDS标准提供了一个平台无关的数据模型,主要用于实时分布式系统。不一样的实现能够相互通讯。整个通讯系统的架构层次图以下。
数组
下面咱们就从几个方面深刻地看下它们的实现机制。网络
服务发现与拓扑管理
首先来看下比较基础与核心的服务发现与拓扑管理。其实现主要在目录cyber/service_discovery/
下。节点间经过读和写端创建数据通路。以channel为边,这样能够获得一个数据流图络。因为节点可能退出,订阅状况也可能发生改变,因此这个网络是动态的。所以须要对网络拓扑进行监控。数据结构
主要负责这件事的数据结构是TopologyManager
,它是个单例,由于每一个进程只要有一个来负责监控网络拓扑就能够了。TopologyManager
有三个子管理器,并有共同的基类Manager
。它们分别为:
- NodeManager
用于管理网络拓扑中的节点。
- ChannelManager
用于管理channel,即网络拓扑中的边。
- ServiceManager
用于管理Service
和Client
。
架构
Cyber RT中有两个层面的拓扑变化的监控:app
- 基于Fast RTPS的发现机制
它主要监视网络中是否有参与者加入或退出。TopologyManager::CreateParticipant()
函数建立transport::Participant
对象时会输入包含host name与process id的名称。ParticipantListener
用于监听网络的变化。网络拓扑发生变化时,Fast RTPS传上来ParticipantDiscoveryInfo
,在TopologyManager::Convert()
函数中对该信息转换成Cyber RT中的数据结构ChangeMsg
。而后调用回调函数TopologyManager::OnParticipantChange()
,它会调用其它几个子管理器的OnTopoModuleLeave()
函数。而后子管理器中即可以将相应维护的信息进行更新(如NodeManager
中将相应的节点删除)。框架
这层拓扑监控主要是经过Fast RTPS提供的自动发现机制。如进程意外退出,则要将各管理中相应信息进行更新。它的优势是若是进程出错或设备断开也能够工做,但粒度比较粗,且不是很是及时(好比断开时)。socket
- 基于主动式的拓扑变动广播
这一部分主要在TopologyManager::Init()
函数中建立和初始化。在初始化时,会调用它们的StartDiscovery()
函数开始启动自动发现机制。基于TopologyManager
中的RtpsParticipant
对象,这几个子管理会经过CreateSubscriber()
和CreatePublisher()
函数建立相应的subscriber和publisher。子管理器中channel名称分别为node_change_broadcast
,channel_change_broadcast
和service_change_broadcast
。Subscriber的回调函数为Manager::OnRemoteChange()
。该回调函数中会解析拓扑变动消息并调用Dispose()
函数进行处理。分布式
这层拓扑监控是主动式的,即须要相应的地方主动调用Join()
或Leave()
来触发,而后各子管理器中回调函数进行信息的更新。如NodeChannelImpl
建立时会调用NodeManager::Join()
。Reader
和Writer
初始化时会调用JoinTheTopolicy()
函数,继而调用ChannelManager::Join()
函数。相应地,有LeaveTheTopology()
函数表示退出拓扑网络。在这两个函数中,会调用Dispose()
函数,而这个函数是虚函数,在各子管理器中有各自的实现。另外Manager
提供AddChangeListener()
函数注册当拓扑发生变化时的回调函数。举例来讲,Reader::JoinTheTopology()
函数中会经过该函数注册回调Reader::OnChannelChange()
。
数据传输
在一个分布式计算系统中,根据两个节点间的位置关系须要使用不一样的传输方式(定义在CommunicationMode
中):
- INTRA:若是是同进程的,由于在同一地址空间,直接传指针就完了。
- SHM(Shared memory):若是是同一机器上,但跨进程的,为了高效可使用共享内存。
- RTPS:若是是跨设备的,那就老老实实经过网络传吧。
示意图以下:
不少时候一个计算图中各类状况都有,因此为了达到最好的性能,须要混合使用。这种混合模式称为HYBRID模式。框架须要根据节点间关系选择合适的传输后端。
每一个Writer
有Transmitter
,每一个Reader
有Receiver
。它们是负责消息发送与收取的类。Transmitter
与Receiver
的基类为Endpoint
,表明一个通讯的端点,它主要的信息是身份标识与属性。其类型为RoleAttributes
(定义在role_attributes.proto
)的成员attr_
包含了host name,process id和一个根据uuid产生的hash值做为id。经过它们就能够判断节点之间的相对位置关系了。
Reader
和Writer
会调用Transport
的方法CreateTransmitter()
和CreateReceiver()
用于建立发送端的transmitter和接收端的receiver。建立时有四种模式可选,分别是INTRA,SHM和RTPS,和HYBRID。最后一种是前三种的混合模式,也是默认的模式。如Transmitter
对应的继承类为IntraTransmitter
,ShmTransmitter
,RtpsTransmitter
和HybridTransmitter
。这几个继承类最主要实现了Transmit()
函数用于传输数据。对于Receiver
来讲是相似的,它有4个继承类对应四种传输方式,即IntraReceiver
,ShmReceiver
,RtpsReceiver
和HybridReceiver
。
结合前面提到的几种模式对应的场景,transmitter与receiver的对应关系以下:
前面提到,传输层实现主要有四个实现后端,对应四种模式:
-
RTPS:RTPS部分基于eProsimar的Fast RTPS。
RtpsTransmitter
类中建立和封装publisher。Transmit()
函数将消息序列化成Fast RTP中的格式UnderlayMessage
,而后经过publisher发出去。RtpsReceiver
中的dispatcher_
成员指向单例RtpsDispatcher
。它用于派发RTPS发来的数据,维护了channel id到subscriber的查找表。RtpsDispatcher::AddSubscriber()
函数使用eprosima::fastrtps::Domain::createSubscriber()
函数建立subscriber,其回调统一为RtpsDispatcher::OnMessage()
函数。该函数会将从RTPS通路来的消息进行派发。 -
SHM:
Segment
类表示一块对应一个channel的共享内存,由SegmentFactory::CreateSegment
函数建立。它有两个继承类PosixSegment
和XsiSegment
,是平台相关的实现。在写端,ShmTransmitter::Transmit()
函数用于发送消息,该函数先经过AcquireBlockToWrite()
函数拿一个可写的block。若是发现该Segment
还没有初始化,会调用OpenOrCreate()
经过OS的接口建立共享内存而且map出虚拟地址。这块共享内存区域大致分两部分。一部分为元信息,另外一部分为消息数据。后者会被切分为相同大小的block。block的buffer大小默认16K,但赶上消息超出大小的时候会调整。拿到该block后,将消息序列化后写入,并通知读者来取消息。通知机制是经过NotifierBase
实现的。它有两个实现类,分别为ConditionNotifier
和MulticastNotifier
。前者为默认设置。它会单独开一块共享共享专门用于通知,其中包含了ReadableInfo
等信息。MulticastNotifier
的主要区别是它是经过指定的socket广播。在读端,ShmDispatcher::Init()
初始化时会建立专门的线程,线程的执行体为ShmDispatcher::Threadfunc()
函数。它在循环体内会经过Listen()
函数等待新消息。若是有新消息写入后发出通知,这儿就会往下走。基于通知中的ReadableInfo
信息,获得channel id,block index等信息,而后调用ReadMessage()
函数读消息并反序列化。以后调用ShmDispatcher::OnMessage()
函数进行消息派发。 -
INTRA :用于进程内通讯。因为读者和写者是在同一进程内,所以能够直接调用。在
IntraTransmitter::Transmit()
函数中,会直接调用读端的IntraDispatcher::OnMessage()
。该函数进行下一步消息的派发。 -
HYBRID:即默认模式,是前三种的结合体。具体功能其实仍是交给前面几个后端完成的,只是它会根据读者与写者的关系使用相应的后端。
消息写端
写端的实现相对简单一些。在模块组件中,能够经过CreateWriter()
函数建立Writer
对象,而后就能够经过该对象向指定channel发送消息了。以CameraComponent
为例:
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name()); ... auto pb_image = std::make_shared<Image>(); pb_image->mutable_header()->set_frame_id(camera_config_->frame_id()); pb_image->set_width(raw_image_->width); pb_image->set_height(raw_image_->height); pb_image->mutable_data()->reserve(raw_image_->image_size); ... writer_->Write(pb_image);
这里先建立了Writer
对象,而后填好了消息里的数据(这里发送的消息类型为Image
,定义在modules/drivers/proto/sensor_image.proto
文件),最后调用Writer::Write()
函数将该消息发出。
CreateWriter()
函数中先建立Writer
对象,再调用Writer::Init()
函数进行初始化。初始化中主要经过CreateTransmitter()
函数建立Transmitter
对象。由于默认是HYBRID模式,因此这里实际建立的是HybridTransmitter
对象。Transmitter
继承自Endpoint
类,它其中的属性信息以用来判断读者与写者的相对关系。不一样的相对关系决定使用何种Transmitter
对象。其配置在InitMode()
函数中设置:
template <typename M> void HybridTransmitter<M>::InitMode() { mode_ = std::make_shared<proto::CommunicationMode>(); mapping_table_[SAME_PROC] = mode_->same_proc(); mapping_table_[DIFF_PROC] = mode_->diff_proc(); mapping_table_[DIFF_HOST] = mode_->diff_host(); }
Writer
对象的初始化中还会将调用JointTheTopology()
函数将之加入到ChannelManager
维护的拓扑信息中。
template <typename MessageT> void Writer<MessageT>::JoinTheTopology() { // add listener change_conn_ = channel_manager_->AddChangeListener(std::bind( &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1)); // get peer readers const std::string& channel_name = this->role_attr_.channel_name(); std::vector<proto::RoleAttributes> readers; channel_manager_->GetReadersOfChannel(channel_name, &readers); for (auto& reader : readers) { transmitter_->Enable(reader); } channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER, message::HasSerializer<MessageT>::value); }
这里还会作一件比较重要的事是enable相应的Transmitter
。先经过ChannelManager
获得该channel相应读者的信息。而后对于每一个读者,调用HybridTransmitter::Enable()
函数。HybridTransmitter
是混合模式的Transmitter
,它其实包含了RTPS,SHM和INTRA三种Transmitter
实例。但这三种Transmitter
并不必定都须要用到。好比,若是该消息对应的读者全是同进程的,那就不必整上SHM和RTPS了。HybridTransmitter::Enable()
函数会根据参数来enable合适的Transmitter
。
template <typename M> void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) { auto relation = GetRelation(opposite_attr); if (relation == NO_RELATION) { return; } uint64_t id = opposite_attr.id(); std::lock_guard<std::mutex> lock(mutex_); receivers_[mapping_table_[relation]].insert(id); transmitters_[mapping_table_[relation]]->Enable(); TransmitHistoryMsg(opposite_attr); }
相应地,在Disable()
函数中决定是否要disable相应的Transmitter
。这样在以后的Transmit()
函数中只要把transmitters_
中的全部Transmitter
拿出来调用Transmit()
函数便可。
发送数据是经过Writer::Write()
函数继而调用Transmitter::Transmit()
函数来实现的。由于这里是用的HybridTransmitter
,所以实际调用的是HybridTransmitter::Transmit()
函数:
template <typename M> bool HybridTransmitter<M>::Transmit(const MessagePtr& msg, const MessageInfo& msg_info) { std::lock_guard<std::mutex> lock(mutex_); history_->Add(msg, msg_info); for (auto& item : transmitters_) { item.second->Transmit(msg, msg_info); } return true; }
能够看到这里分别调用三大Transmitter
的Transmit()
函数发送消息。
消息读端
读端的处理链路相比下复杂一些。先回顾一个Component
中对消息的处理。对于一个Component
来讲,它可能会从多个channel收取消息,而后基于全部channel的消息才能处理。第一个channel暂且称之为主channel。这些channel消息的组合咱们暂且称为组合消息。咱们就来看下典型的两个channel状况,其初始化的主要代码为:
template <typename M0, typename M1> bool Component<M0, M1, NullType, NullType>::Initialize( ... ReaderConfig reader_cfg; reader_cfg.channel_name = config.readers(1).channel(); reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile()); reader_cfg.pending_queue_size = config.readers(1).pending_queue_size(); auto reader1 = node_->template CreateReader<M1>(reader_cfg); reader_cfg.channel_name = config.readers(0).channel(); reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile()); reader_cfg.pending_queue_size = config.readers(0).pending_queue_size(); reader0 = node_->template CreateReader<M0>(reader_cfg); ... readers_.push_back(std::move(reader0)); readers_.push_back(std::move(reader1)); ... std::vector<data::VisitorConfig> config_list; for (auto& reader : readers_) { config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize()); } auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list); croutine::RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv); return sched->CreateTask(factory, node_->Name()); }
其中对两个channel分别建立Reader
对象。该Reader
对象是针对单个channel的。而后针对全部channel建立DataVisitor
对象,这时就是针对全部channel的组合消息了。最后建立协程来进行组合数据的处理。后面会看到每一个Reader
都会有单独的协程来作数据读取。所以,对于一个有n个channel的component,框架会为此建立至少n+1个协程。
其中比较重要的结构就是用于读取消息的Reader
类了。咱们先看Reader
对象的建立。其初始化函数Init()
以下:
template <typename MessageT> bool Reader<MessageT>::Init() { if (init_.exchange(true)) { return true; } std::function<void(const std::shared_ptr<MessageT>&)> func; if (reader_func_ != nullptr) { func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); this->reader_func_(msg); }; } else { func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); }; } auto sched = scheduler::Instance(); croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name(); auto dv = std::make_shared<data::DataVisitor<MessageT>>( role_attr_.channel_id(), pending_queue_size_); // Using factory to wrap templates. croutine::RoutineFactory factory = croutine::CreateRoutineFactory<MessageT>(std::move(func), dv); if (!sched->CreateTask(factory, croutine_name_)) { AERROR << "Create Task Failed!"; init_.store(false); return false; } receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_); this->role_attr_.set_id(receiver_->id().HashValue()); channel_manager_ = service_discovery::TopologyManager::Instance()->channel_manager(); JoinTheTopology(); return true; }
这里主要建立了相应的DataVisitor
类,协程和Receiver
类等。其中DataVisitor
主要用于消息数据的访问。它存放到来的消息数据,并提供接口供消息读取。仍是以两个channel的状况为例:
template <typename M0, typename M1> class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase { public: explicit DataVisitor(const std::vector<VisitorConfig>& configs) : buffer_m0_(configs[0].channel_id, new BufferType<M0>(configs[0].queue_size)), buffer_m1_(configs[1].channel_id, new BufferType<M1>(configs[1].queue_size)) { DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_); DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_); data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_); data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_); } ... bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) { // NOLINT if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) { next_msg_index_++; return true; } return false; } private: fusion::DataFusion<M0, M1>* data_fusion_ = nullptr; ChannelBuffer<M0> buffer_m0_; ChannelBuffer<M1> buffer_m1_; };
它的成员变量中对每个channel都有一个对应的ChannelBuffer
对象。DataDispatcher::AddBuffer()
函数在DataVisitor
初始化时用来将这些个ChannelBuffer
加入到DataDispatcher
的管理中。同时,DataNotifier::AddNotifier()
函数用来以主channel的id为键值加入到DataNotifier
的管理中。DataDispatcher
与DataNotifier
均为单例。前者为模板类,意味着每个消息类型会有对应的DataDispatcher
对象,且相同消息类型会共享该对象。顾名思义,它主要用于数据传输层有数据来时的分发,即当新消息到来时经过DataDispatcher::Dispatch()
函数把它放到相应的消息缓冲区中。后者用于管理全部的Notifier
。它用于在消息派发完后唤醒相应的协程进行处理。这些对象的大致结构图以下:
当channel多于一个时(组合消息),DataVisitor
中还有一个DataFusion
对象用于将多路channel的数据合并。DataFusion
的实现类为AllLatest
,听名字就知道它会取全部channel中的最新值。除了per-channel的ChannelBuffer
对象外,它还有一个特殊的ChannelBuffer
对象用于存放多channel消息的组合消息(即各个channel的消息类型的tuple)。当填入主channel的消息时,会调用由SetFusionCallback()
函数注册的回调。该回调判断是否全部channel都有消息,若是都有消息的话就将这些消息做为组合消息填入该组合消息的ChannelBuffer
中。 在协程处理函数中会调用DataVisitor::TryFetch()
函数从该ChannelBuffer
中拿组合消息。值得注意的是这件事只在主channel有消息来时才会被触发,所以主channel的选取是有讲究的。
Reader
初始化时建立的另外一个关键对象为Receiver
。它有4个继承类,默认为混合模式的HybridReceiver
。HybridReceiver::InitReceivers
中分别建立相应的IntraReceiver
、ShmReceiver
和RtpsReceiver
,放在成员receivers_
数组中。它会来根据写端的状况来enable和disable相应的Receiver
。ReceiverManager
用于管理这些Receiver
对象。它以channel为key进行管理,所以同一进程内订阅同一个channel的会共用同一个Receiver
对象。ReceiverManager::GetReceiver()
函数用于按键值取出Receiver
,如没有,则经过Transport::CreateReceiver()
函数新建一个Receiver
。 这些个Receiver
在Enable()
函数中会经过AddListener()
函数向对应的Dispatcher
注册其回调函数XXXReceiver::OnNewMessage()
。Dispatcher
类中的成员msg_listeners_
是channel id到ListenerHandler
对象的查找表。ListenerHandler
经过signal/slot机制保存了全部这些回调。注意不一样传输后端的AddListener()
实现略有不一样。好比RtpsDispatcher::AddListener()
函数中会将输入的消息先经过ParseFromString()
函数进行解析,而后调用传入的回调。ShmDispatcher::AddListener()
函数也是相似,它会先经过ParseFromArray()
函数解析消息。而对于IntraDispatcher::AddListener()
,因为是同个进程内,是以消息自己的类型传的,就不必解析了。
这些相关结构关系示意图以下:
看了一些关键相关数据结构,接下来看下读端的处理流程。首先,如以前介绍的,各Dispatcher
的继承类各显神通使本身的OnMessage()
回调函数被调用。以RtpsDispatcher
为例:
void RtpsDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr<std::string>& msg_str, const MessageInfo& msg_info) { if (is_shutdown_.load()) { return; } ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { auto handler = std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base); handler->Run(msg_str, msg_info); } }
这里ListenerHandler::Run()
会根据消息的发送者信息找到对应的回调,即Receiver::OnNewMessage()
。
template <typename M> void Receiver<M>::OnNewMessage(const MessagePtr& msg, const MessageInfo& msg_info) { if (msg_listener_ != nullptr) { msg_listener_(msg, msg_info, attr_); } }
这里的回调函数msg_listener_
是在Receiver
建立的时候传入的。其实主要是调用了DataDispatcher::Dispatch()
函数来消息的派发:
transport::Transport::Instance()->CreateReceiver<MessageT>( role_attr, [](const std::shared_ptr<MessageT>& msg, const transport::MessageInfo& msg_info, const proto::RoleAttributes& reader_attr) { (void)msg_info; (void)reader_attr; PerfEventCache::Instance()->AddTransportEvent( TransPerf::DISPATCH, reader_attr.channel_id(), msg_info.seq_num()); data::DataDispatcher<MessageT>::Instance()->Dispatch( reader_attr.channel_id(), msg); PerfEventCache::Instance()->AddTransportEvent( TransPerf::NOTIFY, reader_attr.channel_id(), msg_info.seq_num()); });
DataDisaptcher
是模板类单例,即对于一种特定类型的消息能够共用一个DataDispatcher
。以前在DataVisitor
初始化时会经过AddBuffer()
函数将ChannelBuffer
加入到DataDispatcher
的成员buffers_map_
中。它是一个以channel id为key的map,其value为全部等待该channel上消息的CacheBuffer
的数组。也就是说,消息分发时,只须要根据channel id找到这些buffer,而后将新来的消息填入其中便可。这就是Dispatcher::Dispatch()
函数主要作的事:
template <typename T> bool DataDispatcher<T>::Dispatch(const uint64_t channel_id, const std::shared_ptr<T>& msg) { BufferVector* buffers = nullptr; if (apollo::cyber::IsShutdown()) { return false; } if (buffers_map_.Get(channel_id, &buffers)) { for (auto& buffer_wptr : *buffers) { if (auto buffer = buffer_wptr.lock()) { std::lock_guard<std::mutex> lock(buffer->Mutex()); buffer->Fill(msg); } } } else { return false; } return notifier_->Notify(channel_id); }
最后调用DataNotifier::Notify()
函数来通知新消息的到来。它会触发该channel上全部对应Notifier
中的回调。
inline bool DataNotifier::Notify(const uint64_t channel_id) { NotifyVector* notifies = nullptr; if (notifies_map_.Get(channel_id, ¬ifies)) { for (auto& notifier : *notifies) { if (notifier && notifier->callback) { notifier->callback(); } } return true; } return false; }
这个Notifier
中的回调是在建立协程时经过RegisterNotifyCallback()
函数注册进去的,目的是为了唤醒相应的协程来处理该新消息。
visitor->RegisterNotifyCallback([this, task_id]() { if (cyber_unlikely(stop_.load())) { return; } this->NotifyProcessor(task_id); });
NotifyProcessor()
函数会修改对应协程的状态使之能被调度执行。前面提到,对于n个channel输入的component,会有n+1个协程。它们都是以DataVisitor
和消息回调函数一块儿做为参数建立的。这个协程主体中会调用DataVisitor::TryFetch()
函数拿消息,而后调用注册的消息处理函数:
factory.create_routine = [=]() { return [=]() { std::shared_ptr<M0> msg; for (;;) { CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT); if (dv->TryFetch(msg)) { f(msg); CRoutine::Yield(RoutineState::READY); } else { CRoutine::Yield(); } } }; };
对于那n个消息读取协程来讲,其消息处理函数为:
func = [this](const std::shared_ptr<MessageT>& msg) { this->Enqueue(msg); this->reader_func_(msg); };
这个回调函数中会调用Reader::Enqueue()
函数。在该函数中,主要调用Blocker::Publish()
函数,它继而调用Blocker::Enqueue()
和Blocker::Notify()
函数。Blocker
类是一个存储消息的结构。BlockerManager
类用于管理Blocker
,其中维护了以channel为键值的Blocker
的map。Reader::Enqueue()
函数将消息放到Blocker
的成员published_msg_queue_
队列中。以后,能够经过Blocker::Observe()
函数将成员published_msg_queue_
队列的消息放到成员observed_msg_queue_
队列,而后经过Blocker::GetLatestObserved()
函数获得最新的消息。好比ControlComponent
中的:
chassis_reader_->Observe(); const auto &chassis_msg = chassis_reader_->GetLatestObserved();
而对于剩下那一个协程,它是由主channel来触发的。因它处理的是多channel的组合消息,在协程主体中的TryFetch()
函数会调用AllLatest::Fusion()
函数同时拿多个channel上的最新消息。至于这个组合消息是怎么填入的前面有提。简单来讲,对于它来讲,主channel来消息时,同时也会将其它channel的消息写入组合消息。而后调度协程,拿出组合消息进行处理。其消息处理函数为:
auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) { auto ptr = self.lock(); if (ptr) { ptr->Process(msg0, msg1); } else { AERROR << "Component object has been destroyed."; } };
该实现中主要以收到的消息为参数调用Component
中的处理函数Process()
,从而执行组件的自定义处理逻辑。
小结
文中提了很多细枝末节,最后很是high-level地归纳下从写者到读者的流程。写者Writer
写消息时,会经过HybridTransmitter
继而使用合适后端的Transmitter
发送消息。根据读与写者间的位置关系,通过网络、共享内存或直接调用的方式,对应后端的Dispatcher
收到消息。收到后转成指定消息类型,交给Receiver
。而后经过DataDispatcher
派发消息。派发消息就是将消息放到对应的buffer中,而后通知相应的协程来做进一步处理。上层模块要取用这些消息,主要两种方式:一种是经过Component
的Proc()
接口,它被调用时参数就是最新的消息。另外一种是经过Reader
的Observe()
函数直接拿。
咱们知道,Apollo在版本3.5前是基于ROS的,同时也对ROS作了几个重要改进。这些改进很多是关于通讯系统的,如共享内存、去中心化和数据兼容性。到Cyber RT的演进也天然延续了这几个优势。总得来讲,Cyber RT基于自动发现机制与Publish-Subscribe模式实现了通讯网络的拓扑管理。同时它对数据传输层作了抽象,下面实现多个后端分别适用于不一样场景,并提供了HYBRID模式能够根据读者和写者间的关系自动使用合适的传输层后端。这样,通讯系统的复杂性就被很好地屏蔽,框架就能提供给应用层便利的开发接口。