自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的通讯传输

前言

计算框架是自动驾驶系统中的重中之重,也是整个系统得以高效稳定运行的基础。为了实时地完成感知、决策和执行,系统须要一系列的模块相互紧密配合,高效地执行任务流。因为各类缘由,这些模块可能位于不一样进程,也可能位于不一样机器。这就要求计算框架中具备灵活的、高性能的通讯机制。咱们知道,Apollo在3.5版本中推出了Cyber RT替代了原先的ROS。以前写过两篇相关的文章介绍了其中的调度部分:《自动驾驶平台Apollo 3.5阅读手记:Cyber RT中的协程(Coroutine)》《自动驾驶平台Apollo 5.5阅读手记:Cyber RT中的任务调度》。今天就来聊一下其中的另外一重要部分-通讯系统。node

和ROS & ROS2中相似,Cyber RT中支持两种数据交换模式:一种是Publish-Subscribe模式,经常使用于数据流处理中节点间通讯。即发布者(Publisher)在channel(ROS中对应地称为topic)上发布消息,订阅该channel的订阅者(Subscriber)便会收到消息数据;另外一种就是常见的Service-Client模式,经常使用于客户端与服务端的请求与响应。本质上它是能够基于前者实现的。Node是整个数据拓扑网络中的基本单元。一个Node中能够建立多个读者/写者,服务端/客户端。读者和写者分别对应ReaderWriter,用于Publish-Subscribe模式。服务端和客户端分别对应ServiceClient,用于Service-Client模式。后端

实现解析

自动驾驶系统中的各个处理模块基本都是实现为Component。一个Component中包含一个Node,另外会根据须要建立和管理WriterReaderServiceClient。这些用于通讯的类下面基于TrasmitterReceiver类。前者用于数据发送,后者用于数据接收。它们是数据传输层的抽象,之下可有多个传输层实现用于不一样场景下的传输。如对于TrasmitterIntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter。对于Receiver也是相似的。其中RTPS后端基于Fast RTPS。Fast RTPS是DDS(Data Distribution Service)标准的一个很是流行的开源实现。DDS标准提供了一个平台无关的数据模型,主要用于实时分布式系统。不一样的实现能够相互通讯。整个通讯系统的架构层次图以下。
在这里插入图片描述
数组

下面咱们就从几个方面深刻地看下它们的实现机制。网络

服务发现与拓扑管理

首先来看下比较基础与核心的服务发现与拓扑管理。其实现主要在目录cyber/service_discovery/下。节点间经过读和写端创建数据通路。以channel为边,这样能够获得一个数据流图络。因为节点可能退出,订阅状况也可能发生改变,因此这个网络是动态的。所以须要对网络拓扑进行监控。数据结构

主要负责这件事的数据结构是TopologyManager,它是个单例,由于每一个进程只要有一个来负责监控网络拓扑就能够了。TopologyManager有三个子管理器,并有共同的基类Manager。它们分别为:
  - NodeManager用于管理网络拓扑中的节点。
  - ChannelManager用于管理channel,即网络拓扑中的边。
  - ServiceManager用于管理ServiceClient


架构

Cyber RT中有两个层面的拓扑变化的监控:app

  • 基于Fast RTPS的发现机制

它主要监视网络中是否有参与者加入或退出。TopologyManager::CreateParticipant()函数建立transport::Participant对象时会输入包含host name与process id的名称。ParticipantListener用于监听网络的变化。网络拓扑发生变化时,Fast RTPS传上来ParticipantDiscoveryInfo,在TopologyManager::Convert()函数中对该信息转换成Cyber RT中的数据结构ChangeMsg。而后调用回调函数TopologyManager::OnParticipantChange(),它会调用其它几个子管理器的OnTopoModuleLeave()函数。而后子管理器中即可以将相应维护的信息进行更新(如NodeManager中将相应的节点删除)。框架

这层拓扑监控主要是经过Fast RTPS提供的自动发现机制。如进程意外退出,则要将各管理中相应信息进行更新。它的优势是若是进程出错或设备断开也能够工做,但粒度比较粗,且不是很是及时(好比断开时)。socket

  • 基于主动式的拓扑变动广播

这一部分主要在TopologyManager::Init()函数中建立和初始化。在初始化时,会调用它们的StartDiscovery()函数开始启动自动发现机制。基于TopologyManager中的RtpsParticipant对象,这几个子管理会经过CreateSubscriber()CreatePublisher()函数建立相应的subscriber和publisher。子管理器中channel名称分别为node_change_broadcastchannel_change_broadcastservice_change_broadcast。Subscriber的回调函数为Manager::OnRemoteChange()。该回调函数中会解析拓扑变动消息并调用Dispose()函数进行处理。分布式

这层拓扑监控是主动式的,即须要相应的地方主动调用Join()Leave()来触发,而后各子管理器中回调函数进行信息的更新。如NodeChannelImpl建立时会调用NodeManager::Join()ReaderWriter初始化时会调用JoinTheTopolicy()函数,继而调用ChannelManager::Join()函数。相应地,有LeaveTheTopology()函数表示退出拓扑网络。在这两个函数中,会调用Dispose()函数,而这个函数是虚函数,在各子管理器中有各自的实现。另外Manager提供AddChangeListener()函数注册当拓扑发生变化时的回调函数。举例来讲,Reader::JoinTheTopology()函数中会经过该函数注册回调Reader::OnChannelChange()

数据传输

在一个分布式计算系统中,根据两个节点间的位置关系须要使用不一样的传输方式(定义在CommunicationMode中):
  - INTRA:若是是同进程的,由于在同一地址空间,直接传指针就完了。
  - SHM(Shared memory):若是是同一机器上,但跨进程的,为了高效可使用共享内存。
  - RTPS:若是是跨设备的,那就老老实实经过网络传吧。


示意图以下:
在这里插入图片描述

不少时候一个计算图中各类状况都有,因此为了达到最好的性能,须要混合使用。这种混合模式称为HYBRID模式。框架须要根据节点间关系选择合适的传输后端。

每一个WriterTransmitter,每一个ReaderReceiver。它们是负责消息发送与收取的类。TransmitterReceiver的基类为Endpoint,表明一个通讯的端点,它主要的信息是身份标识与属性。其类型为RoleAttributes(定义在role_attributes.proto)的成员attr_包含了host name,process id和一个根据uuid产生的hash值做为id。经过它们就能够判断节点之间的相对位置关系了。

ReaderWriter会调用Transport的方法CreateTransmitter()CreateReceiver()用于建立发送端的transmitter和接收端的receiver。建立时有四种模式可选,分别是INTRA,SHM和RTPS,和HYBRID。最后一种是前三种的混合模式,也是默认的模式。如Transmitter对应的继承类为IntraTransmitterShmTransmitterRtpsTransmitterHybridTransmitter。这几个继承类最主要实现了Transmit()函数用于传输数据。对于Receiver来讲是相似的,它有4个继承类对应四种传输方式,即IntraReceiverShmReceiverRtpsReceiverHybridReceiver

结合前面提到的几种模式对应的场景,transmitter与receiver的对应关系以下:
在这里插入图片描述

前面提到,传输层实现主要有四个实现后端,对应四种模式:

  • RTPS:RTPS部分基于eProsimar的Fast RTPS。RtpsTransmitter类中建立和封装publisher。Transmit()函数将消息序列化成Fast RTP中的格式UnderlayMessage,而后经过publisher发出去。RtpsReceiver中的dispatcher_成员指向单例RtpsDispatcher。它用于派发RTPS发来的数据,维护了channel id到subscriber的查找表。RtpsDispatcher::AddSubscriber()函数使用eprosima::fastrtps::Domain::createSubscriber()函数建立subscriber,其回调统一为RtpsDispatcher::OnMessage()函数。该函数会将从RTPS通路来的消息进行派发。

  • SHMSegment类表示一块对应一个channel的共享内存,由SegmentFactory::CreateSegment函数建立。它有两个继承类PosixSegmentXsiSegment,是平台相关的实现。在写端,ShmTransmitter::Transmit()函数用于发送消息,该函数先经过AcquireBlockToWrite()函数拿一个可写的block。若是发现该Segment还没有初始化,会调用OpenOrCreate()经过OS的接口建立共享内存而且map出虚拟地址。这块共享内存区域大致分两部分。一部分为元信息,另外一部分为消息数据。后者会被切分为相同大小的block。block的buffer大小默认16K,但赶上消息超出大小的时候会调整。拿到该block后,将消息序列化后写入,并通知读者来取消息。通知机制是经过NotifierBase实现的。它有两个实现类,分别为ConditionNotifierMulticastNotifier。前者为默认设置。它会单独开一块共享共享专门用于通知,其中包含了ReadableInfo等信息。MulticastNotifier的主要区别是它是经过指定的socket广播。在读端,ShmDispatcher::Init()初始化时会建立专门的线程,线程的执行体为ShmDispatcher::Threadfunc()函数。它在循环体内会经过Listen()函数等待新消息。若是有新消息写入后发出通知,这儿就会往下走。基于通知中的ReadableInfo信息,获得channel id,block index等信息,而后调用ReadMessage()函数读消息并反序列化。以后调用ShmDispatcher::OnMessage()函数进行消息派发。

  • INTRA :用于进程内通讯。因为读者和写者是在同一进程内,所以能够直接调用。在IntraTransmitter::Transmit()函数中,会直接调用读端的IntraDispatcher::OnMessage()。该函数进行下一步消息的派发。

  • HYBRID:即默认模式,是前三种的结合体。具体功能其实仍是交给前面几个后端完成的,只是它会根据读者与写者的关系使用相应的后端。

消息写端

写端的实现相对简单一些。在模块组件中,能够经过CreateWriter()函数建立Writer对象,而后就能够经过该对象向指定channel发送消息了。以CameraComponent为例:

writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());
...
auto pb_image = std::make_shared<Image>();                                 
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());      
pb_image->set_width(raw_image_->width);                                    
pb_image->set_height(raw_image_->height);                                  
pb_image->mutable_data()->reserve(raw_image_->image_size);       
...
writer_->Write(pb_image);

这里先建立了Writer对象,而后填好了消息里的数据(这里发送的消息类型为Image,定义在modules/drivers/proto/sensor_image.proto文件),最后调用Writer::Write()函数将该消息发出。

CreateWriter()函数中先建立Writer对象,再调用Writer::Init()函数进行初始化。初始化中主要经过CreateTransmitter()函数建立Transmitter对象。由于默认是HYBRID模式,因此这里实际建立的是HybridTransmitter对象。Transmitter继承自Endpoint类,它其中的属性信息以用来判断读者与写者的相对关系。不一样的相对关系决定使用何种Transmitter对象。其配置在InitMode()函数中设置:

template <typename M>                                   
void HybridTransmitter<M>::InitMode() {                  
  mode_ = std::make_shared<proto::CommunicationMode>(); 
  mapping_table_[SAME_PROC] = mode_->same_proc();       
  mapping_table_[DIFF_PROC] = mode_->diff_proc();       
  mapping_table_[DIFF_HOST] = mode_->diff_host();       
}

Writer对象的初始化中还会将调用JointTheTopology()函数将之加入到ChannelManager维护的拓扑信息中。

template <typename MessageT>                                              
void Writer<MessageT>::JoinTheTopology() {                                 
  // add listener 
  change_conn_ = channel_manager_->AddChangeListener(std::bind(           
      &Writer<MessageT>::OnChannelChange, this, std::placeholders::_1));  
                                                                          
  // get peer readers 
  const std::string& channel_name = this->role_attr_.channel_name();      
  std::vector<proto::RoleAttributes> readers;                             
  channel_manager_->GetReadersOfChannel(channel_name, &readers);          
  for (auto& reader : readers) {                                           
    transmitter_->Enable(reader);                                         
  }                                                                       
                                                                          
  channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_WRITER,  
                         message::HasSerializer<MessageT>::value);        
}

这里还会作一件比较重要的事是enable相应的Transmitter。先经过ChannelManager获得该channel相应读者的信息。而后对于每一个读者,调用HybridTransmitter::Enable()函数。HybridTransmitter是混合模式的Transmitter,它其实包含了RTPS,SHM和INTRA三种Transmitter实例。但这三种Transmitter并不必定都须要用到。好比,若是该消息对应的读者全是同进程的,那就不必整上SHM和RTPS了。HybridTransmitter::Enable()函数会根据参数来enable合适的Transmitter

template <typename M>
void HybridTransmitter<M>::Enable(const RoleAttributes& opposite_attr) {     
  auto relation = GetRelation(opposite_attr);
  if (relation == NO_RELATION) { 
    return;
  }
  
  uint64_t id = opposite_attr.id();
  std::lock_guard<std::mutex> lock(mutex_);                                 
  receivers_[mapping_table_[relation]].insert(id);                          
  transmitters_[mapping_table_[relation]]->Enable();
  TransmitHistoryMsg(opposite_attr);                                        
}

相应地,在Disable()函数中决定是否要disable相应的Transmitter。这样在以后的Transmit()函数中只要把transmitters_中的全部Transmitter拿出来调用Transmit()函数便可。

发送数据是经过Writer::Write()函数继而调用Transmitter::Transmit()函数来实现的。由于这里是用的HybridTransmitter,所以实际调用的是HybridTransmitter::Transmit()函数:

template <typename M>
bool HybridTransmitter<M>::Transmit(const MessagePtr& msg,
                                    const MessageInfo& msg_info) { 
  std::lock_guard<std::mutex> lock(mutex_);
  history_->Add(msg, msg_info);
  for (auto& item : transmitters_) { 
    item.second->Transmit(msg, msg_info);
  }
  return true;
}

能够看到这里分别调用三大TransmitterTransmit()函数发送消息。

消息读端

读端的处理链路相比下复杂一些。先回顾一个Component中对消息的处理。对于一个Component来讲,它可能会从多个channel收取消息,而后基于全部channel的消息才能处理。第一个channel暂且称之为主channel。这些channel消息的组合咱们暂且称为组合消息。咱们就来看下典型的两个channel状况,其初始化的主要代码为:

template <typename M0, typename M1>                      
bool Component<M0, M1, NullType, NullType>::Initialize(  
  ...
  ReaderConfig reader_cfg;                                                       
  reader_cfg.channel_name = config.readers(1).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(1).pending_queue_size();        
                                                                                 
  auto reader1 = node_->template CreateReader<M1>(reader_cfg);                   
                                                                                 
  reader_cfg.channel_name = config.readers(0).channel();                         
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());              
  reader_cfg.pending_queue_size = config.readers(0).pending_queue_size();        
                                                                                 
  reader0 = node_->template CreateReader<M0>(reader_cfg);         
  ...
  readers_.push_back(std::move(reader0)); 
  readers_.push_back(std::move(reader1)); 
  ...  
  std::vector<data::VisitorConfig> config_list;                                   
  for (auto& reader : readers_) {                                                  
    config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize());    
  }                                                                               
  auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list);             
  croutine::RoutineFactory factory =                                              
      croutine::CreateRoutineFactory<M0, M1>(func, dv);                           
  return sched->CreateTask(factory, node_->Name());                               
}

其中对两个channel分别建立Reader对象。该Reader对象是针对单个channel的。而后针对全部channel建立DataVisitor对象,这时就是针对全部channel的组合消息了。最后建立协程来进行组合数据的处理。后面会看到每一个Reader都会有单独的协程来作数据读取。所以,对于一个有n个channel的component,框架会为此建立至少n+1个协程。

其中比较重要的结构就是用于读取消息的Reader类了。咱们先看Reader对象的建立。其初始化函数Init()以下:

template <typename MessageT>                                                         
bool Reader<MessageT>::Init() {                                                       
  if (init_.exchange(true)) {                                                         
    return true;                                                                     
  }                                                                                  
  std::function<void(const std::shared_ptr<MessageT>&)> func;                        
  if (reader_func_ != nullptr) {                                                      
    func = [this](const std::shared_ptr<MessageT>& msg) {                             
      this->Enqueue(msg);                                                            
      this->reader_func_(msg);                                                       
    };                                                                               
  } else {                                                                            
    func = [this](const std::shared_ptr<MessageT>& msg) {  this->Enqueue(msg); };     
  }                                                                                  
  auto sched = scheduler::Instance();                                                
  croutine_name_ = role_attr_.node_name() + "_" + role_attr_.channel_name();         
  auto dv = std::make_shared<data::DataVisitor<MessageT>>(                           
      role_attr_.channel_id(), pending_queue_size_);                                 
  // Using factory to wrap templates. 
  croutine::RoutineFactory factory =                                                 
      croutine::CreateRoutineFactory<MessageT>(std::move(func), dv);                 
  if (!sched->CreateTask(factory, croutine_name_)) {                                  
    AERROR << "Create Task Failed!";                                                 
    init_.store(false);                                                              
    return false;                                                                    
  }                                                                                  
                                                                                     
  receiver_ = ReceiverManager<MessageT>::Instance()->GetReceiver(role_attr_);        
  this->role_attr_.set_id(receiver_->id().HashValue());                              
  channel_manager_ =                                                                 
      service_discovery::TopologyManager::Instance()->channel_manager();             
  JoinTheTopology();                                                                 
                                                                                     
  return true;                                                                       
}

这里主要建立了相应的DataVisitor类,协程和Receiver类等。其中DataVisitor主要用于消息数据的访问。它存放到来的消息数据,并提供接口供消息读取。仍是以两个channel的状况为例:

template <typename M0, typename M1>                                                  
class DataVisitor<M0, M1, NullType, NullType> : public DataVisitorBase {              
 public:                                                                             
  explicit DataVisitor(const std::vector<VisitorConfig>& configs)                    
      : buffer_m0_(configs[0].channel_id,                                            
                   new BufferType<M0>(configs[0].queue_size)),                       
        buffer_m1_(configs[1].channel_id,                                            
                   new BufferType<M1>(configs[1].queue_size)) {                       
    DataDispatcher<M0>::Instance()->AddBuffer(buffer_m0_);                           
    DataDispatcher<M1>::Instance()->AddBuffer(buffer_m1_);                           
    data_notifier_->AddNotifier(buffer_m0_.channel_id(), notifier_);                 
    data_fusion_ = new fusion::AllLatest<M0, M1>(buffer_m0_, buffer_m1_);            
  }   
  ... 
  bool TryFetch(std::shared_ptr<M0>& m0, std::shared_ptr<M1>& m1) {   // NOLINT 
    if (data_fusion_->Fusion(&next_msg_index_, m0, m1)) {                                  
      next_msg_index_++;                                                                  
      return true;                                                                        
    }                                                                                     
    return false;                                                                         
  }                                                                                       
                                                                                          
 private:                                                                                 
  fusion::DataFusion<M0, M1>* data_fusion_ = nullptr;                                     
  ChannelBuffer<M0> buffer_m0_;                                                           
  ChannelBuffer<M1> buffer_m1_;                                                           
};

它的成员变量中对每个channel都有一个对应的ChannelBuffer对象。DataDispatcher::AddBuffer()函数在DataVisitor初始化时用来将这些个ChannelBuffer加入到DataDispatcher的管理中。同时,DataNotifier::AddNotifier()函数用来以主channel的id为键值加入到DataNotifier的管理中。DataDispatcherDataNotifier均为单例。前者为模板类,意味着每个消息类型会有对应的DataDispatcher对象,且相同消息类型会共享该对象。顾名思义,它主要用于数据传输层有数据来时的分发,即当新消息到来时经过DataDispatcher::Dispatch()函数把它放到相应的消息缓冲区中。后者用于管理全部的Notifier。它用于在消息派发完后唤醒相应的协程进行处理。这些对象的大致结构图以下:
在这里插入图片描述

当channel多于一个时(组合消息),DataVisitor中还有一个DataFusion对象用于将多路channel的数据合并。DataFusion的实现类为AllLatest,听名字就知道它会取全部channel中的最新值。除了per-channel的ChannelBuffer对象外,它还有一个特殊的ChannelBuffer对象用于存放多channel消息的组合消息(即各个channel的消息类型的tuple)。当填入主channel的消息时,会调用由SetFusionCallback()函数注册的回调。该回调判断是否全部channel都有消息,若是都有消息的话就将这些消息做为组合消息填入该组合消息的ChannelBuffer中。 在协程处理函数中会调用DataVisitor::TryFetch()函数从该ChannelBuffer中拿组合消息。值得注意的是这件事只在主channel有消息来时才会被触发,所以主channel的选取是有讲究的。

Reader初始化时建立的另外一个关键对象为Receiver。它有4个继承类,默认为混合模式的HybridReceiverHybridReceiver::InitReceivers中分别建立相应的IntraReceiverShmReceiverRtpsReceiver,放在成员receivers_数组中。它会来根据写端的状况来enable和disable相应的ReceiverReceiverManager用于管理这些Receiver对象。它以channel为key进行管理,所以同一进程内订阅同一个channel的会共用同一个Receiver对象。ReceiverManager::GetReceiver()函数用于按键值取出Receiver,如没有,则经过Transport::CreateReceiver()函数新建一个Receiver。 这些个ReceiverEnable()函数中会经过AddListener()函数向对应的Dispatcher注册其回调函数XXXReceiver::OnNewMessage()Dispatcher类中的成员msg_listeners_是channel id到ListenerHandler对象的查找表。ListenerHandler经过signal/slot机制保存了全部这些回调。注意不一样传输后端的AddListener()实现略有不一样。好比RtpsDispatcher::AddListener()函数中会将输入的消息先经过ParseFromString()函数进行解析,而后调用传入的回调。ShmDispatcher::AddListener()函数也是相似,它会先经过ParseFromArray()函数解析消息。而对于IntraDispatcher::AddListener(),因为是同个进程内,是以消息自己的类型传的,就不必解析了。

这些相关结构关系示意图以下:
在这里插入图片描述
看了一些关键相关数据结构,接下来看下读端的处理流程。首先,如以前介绍的,各Dispatcher的继承类各显神通使本身的OnMessage()回调函数被调用。以RtpsDispatcher为例:

void RtpsDispatcher::OnMessage(uint64_t channel_id,
                               const std::shared_ptr<std::string>& msg_str,
                               const MessageInfo& msg_info) { 
  if (is_shutdown_.load()) { 
    return;
  }
    
  ListenerHandlerBasePtr* handler_base = nullptr;
  if (msg_listeners_.Get(channel_id, &handler_base)) { 
    auto handler =
        std::dynamic_pointer_cast<ListenerHandler<std::string>>(*handler_base);
    handler->Run(msg_str, msg_info);
  }
}

这里ListenerHandler::Run()会根据消息的发送者信息找到对应的回调,即Receiver::OnNewMessage()

template <typename M>
void Receiver<M>::OnNewMessage(const MessagePtr& msg,
                               const MessageInfo& msg_info) { 
  if (msg_listener_ != nullptr) { 
    msg_listener_(msg, msg_info, attr_);
  }
}

这里的回调函数msg_listener_是在Receiver建立的时候传入的。其实主要是调用了DataDispatcher::Dispatch()函数来消息的派发:

transport::Transport::Instance()->CreateReceiver<MessageT>(      
    role_attr, [](const std::shared_ptr<MessageT>& msg,          
                  const transport::MessageInfo& msg_info,        
                  const proto::RoleAttributes& reader_attr) {     
      (void)msg_info;                                            
      (void)reader_attr;                                         
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::DISPATCH, reader_attr.channel_id(),         
          msg_info.seq_num());                                   
      data::DataDispatcher<MessageT>::Instance()->Dispatch(      
          reader_attr.channel_id(), msg);                        
      PerfEventCache::Instance()->AddTransportEvent(             
          TransPerf::NOTIFY, reader_attr.channel_id(),           
          msg_info.seq_num());                                   
    });

DataDisaptcher是模板类单例,即对于一种特定类型的消息能够共用一个DataDispatcher。以前在DataVisitor初始化时会经过AddBuffer()函数将ChannelBuffer加入到DataDispatcher的成员buffers_map_中。它是一个以channel id为key的map,其value为全部等待该channel上消息的CacheBuffer的数组。也就是说,消息分发时,只须要根据channel id找到这些buffer,而后将新来的消息填入其中便可。这就是Dispatcher::Dispatch()函数主要作的事:

template <typename T>                                                 
bool DataDispatcher<T>::Dispatch(const uint64_t channel_id,           
                                 const std::shared_ptr<T>& msg) {      
  BufferVector* buffers = nullptr;                                    
  if (apollo::cyber::IsShutdown()) {                                   
    return false;                                                     
  }                                                                   
  if (buffers_map_.Get(channel_id, &buffers)) {                        
    for (auto& buffer_wptr : *buffers) {                               
      if (auto buffer = buffer_wptr.lock()) {                          
        std::lock_guard<std::mutex> lock(buffer->Mutex());            
        buffer->Fill(msg);                                            
      }                                                               
    }                                                                 
  } else {                                                             
    return false;                                                     
  }                                                                   
  return notifier_->Notify(channel_id);                               
}

最后调用DataNotifier::Notify()函数来通知新消息的到来。它会触发该channel上全部对应Notifier中的回调。

inline bool DataNotifier::Notify(const uint64_t channel_id) {   
  NotifyVector* notifies = nullptr;                            
  if (notifies_map_.Get(channel_id, &notifies)) {               
    for (auto& notifier : *notifies) {                          
      if (notifier && notifier->callback) {                     
        notifier->callback();                                  
      }                                                        
    }                                                          
    return true;                                               
  }                                                            
  return false;                                                
}

这个Notifier中的回调是在建立协程时经过RegisterNotifyCallback()函数注册进去的,目的是为了唤醒相应的协程来处理该新消息。

visitor->RegisterNotifyCallback([this, task_id]() {  
  if (cyber_unlikely(stop_.load())) {                
    return;                                         
  }                                                 
  this->NotifyProcessor(task_id);                   
});

NotifyProcessor()函数会修改对应协程的状态使之能被调度执行。前面提到,对于n个channel输入的component,会有n+1个协程。它们都是以DataVisitor和消息回调函数一块儿做为参数建立的。这个协程主体中会调用DataVisitor::TryFetch()函数拿消息,而后调用注册的消息处理函数:

factory.create_routine = [=]() {                                            
    return [=]() {                                                            
      std::shared_ptr<M0> msg;                                               
      for (;;) {                                                              
        CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);   
        if (dv->TryFetch(msg)) {                                              
          f(msg);                                                            
          CRoutine::Yield(RoutineState::READY);                              
        } else {                                                              
          CRoutine::Yield();                                                 
        }                                                                    
      }                                                                      
    };                                                                       
  };

对于那n个消息读取协程来讲,其消息处理函数为:

func = [this](const std::shared_ptr<MessageT>& msg) {     
  this->Enqueue(msg);                                    
  this->reader_func_(msg);                               
};

这个回调函数中会调用Reader::Enqueue()函数。在该函数中,主要调用Blocker::Publish()函数,它继而调用Blocker::Enqueue()Blocker::Notify()函数。Blocker类是一个存储消息的结构。BlockerManager类用于管理Blocker,其中维护了以channel为键值的Blocker的map。Reader::Enqueue()函数将消息放到Blocker的成员published_msg_queue_队列中。以后,能够经过Blocker::Observe()函数将成员published_msg_queue_队列的消息放到成员observed_msg_queue_队列,而后经过Blocker::GetLatestObserved()函数获得最新的消息。好比ControlComponent中的:

chassis_reader_->Observe();
const auto &chassis_msg = chassis_reader_->GetLatestObserved();

而对于剩下那一个协程,它是由主channel来触发的。因它处理的是多channel的组合消息,在协程主体中的TryFetch()函数会调用AllLatest::Fusion()函数同时拿多个channel上的最新消息。至于这个组合消息是怎么填入的前面有提。简单来讲,对于它来讲,主channel来消息时,同时也会将其它channel的消息写入组合消息。而后调度协程,拿出组合消息进行处理。其消息处理函数为:

auto func = [self](const std::shared_ptr<M0>& msg0,         
                    const std::shared_ptr<M1>& msg1) {        
   auto ptr = self.lock();                                   
   if (ptr) {                                                 
     ptr->Process(msg0, msg1);                               
   } else {                                                   
     AERROR << "Component object has been destroyed.";       
   }                                                         
 };

该实现中主要以收到的消息为参数调用Component中的处理函数Process(),从而执行组件的自定义处理逻辑。

小结

文中提了很多细枝末节,最后很是high-level地归纳下从写者到读者的流程。写者Writer写消息时,会经过HybridTransmitter继而使用合适后端的Transmitter发送消息。根据读与写者间的位置关系,通过网络、共享内存或直接调用的方式,对应后端的Dispatcher收到消息。收到后转成指定消息类型,交给Receiver。而后经过DataDispatcher派发消息。派发消息就是将消息放到对应的buffer中,而后通知相应的协程来做进一步处理。上层模块要取用这些消息,主要两种方式:一种是经过ComponentProc()接口,它被调用时参数就是最新的消息。另外一种是经过ReaderObserve()函数直接拿。
在这里插入图片描述

咱们知道,Apollo在版本3.5前是基于ROS的,同时也对ROS作了几个重要改进。这些改进很多是关于通讯系统的,如共享内存、去中心化和数据兼容性。到Cyber RT的演进也天然延续了这几个优势。总得来讲,Cyber RT基于自动发现机制与Publish-Subscribe模式实现了通讯网络的拓扑管理。同时它对数据传输层作了抽象,下面实现多个后端分别适用于不一样场景,并提供了HYBRID模式能够根据读者和写者间的关系自动使用合适的传输层后端。这样,通讯系统的复杂性就被很好地屏蔽,框架就能提供给应用层便利的开发接口。

相关文章
相关标签/搜索