1、Envoy的工做模式bootstrap
Envoy的工做模式如图所示,横向是管理平面。后端
Envoy会暴露admin的API,能够经过API查看Envoy中的路由或者集群的配置。api
例如经过curl http://127.0.0.1:15000/routes能够查看路由的配置,结果以下图,请记住路由的配置层级,后面在代码中会看到熟悉的数据结构。缓存
routes下面有virtual_hosts,里面有带prefix的route,里面是route entry,里面是weight cluster,对于不一样的cluster不一样的路由权重,再里面是cluster的列表,有cluster的名称和权重。性能优化
再如经过curl http://127.0.0.1:15000/clusters能够获得集群也即cluster的配置,这里面是真正cluster的信息。网络
在另一面,Envoy会调用envoy API去pilot里面获取路由和集群的配置,envoy API的详情能够查看https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文档,能够看到route的配置详情,也是按照上面的层级组织的。数据结构
当Envoy从pilot获取到路由和集群信息以后,会保存在内存中,当有个客户端要链接后端的时候,客户端处于Downstream,后端处于Upstream,当数据从Downstream流向Upstream的时候,会经过Filter,根据路由和集群的配置,选择后端的应用创建链接,将请求转发出去。架构
接下来咱们来看Envoy是如何实现这些的。并发
2、Envoy的关键数据结构的建立app
Envoy的启动会在main函数中建立Envoy::MainCommon,在它的构造函数中,会调用父类MainCommonBase的构造函数,建立Server::InstanceImpl,接下来调用InstanceImpl::initialize(...)
接下来就进入关键的初始化阶段。
加载初始化配置,里面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。
InstanceUtil::loadBootstrapConfig(bootstrap_, options);
建立AdminImpl,从而能够接受请求接口
admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(), initial_config.admin().profilePath(), *this));
建立ListenerManagerImpl,用于管理监听,由于Downstream要访问Upstream的时候,envoy会进行监听,Downstream会链接监听的端口。
listener_manager_.reset( new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));
在ListenerManagerImpl的构造函数中,建立了不少的worker,Envoy采用libevent监听socket的事件,当有一个新的链接来的时候,会将任务分配给某个worker进行处理,从而实现异步的处理。
ListenerManagerImpl::ListenerManagerImpl(...) { for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back(worker_factory.createWorker()); } }
createWorker会建立WorkerImpl,初始化WorkerImpl须要两个重要的参数。
一个是allocateDispatcher建立出来的DispatcherImpl,用来封装libevent的事件分发的。
一个是ConnectionHandlerImpl,用来管理一个链接的。
咱们接着看初始化过程,建立ProdClusterManagerFactory,用于建立Cluster Manager,管理上游的集群。
cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);
在ProdClusterManagerFactory会建立ClusterManagerImpl,在ClusterManagerImpl的构造函数中,若是配置了CDS,就须要订阅Cluster Discover Service。
if (bootstrap.dynamic_resources().has_cds_config()) { cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this); init_helper_.setCds(cds_api_.get()); }
会调用ProdClusterManagerFactory::createCds,里面会建立CdsApiImpl。在CdsApiImpl的构造函数中,会建立CdsSubscription,订阅CDS,当集群的配置发生变化的时候,会调用CdsApiImpl::onConfigUpdate(...)
接下来在InstanceImpl::initialize的初始化函数中,若是配置了LDS,就须要订阅Listener Discover Service。
if (bootstrap_.dynamic_resources().has_lds_config()) { listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); }
ListenerManagerImpl的createLdsApi会调用ProdListenerComponentFactory的createLdsApi函数,会建立LdsApiImpl。在LdsApiImpl的构造函数中,会建立LdsSubscription,订阅LDS,当Listener的配置改变的时候,会调用LdsApiImpl::onConfigUpdate(...)
3、Envoy的启动
MainCommonBase::run() 会调用InstanceImpl::run(),会调用InstanceImpl::startWorkers(),会调用ListenerManagerImpl::startWorkers。
startWorkers会将Listener添加到全部的worker中,而后启动worker的线程。
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { workers_started_ = true; for (const auto& worker : workers_) { for (const auto& listener : active_listeners_) { addListenerToWorker(*worker, *listener); } worker->start(guard_dog); } }
对于每个WorkerImpl,会调用ConnectionHandlerImpl的addListener函数,会建立ActiveListener对象,ActiveListener很重要,他的函数会在libevent收到事件的时候被调用。
在ActiveListener的构造函数中,会调用相同Worker的DispatcherImpl的createListener,建立Network::ListenerImpl,注意这个类的namespace,由于对于Envoy来说,LDS里面有个Listener的概念,可是Socket也有Listener的概念,在Network这个namespace下面的,是对socket和libevent的封装。
在Network::ListenerImpl的构造函数中,当收到事件的时候,会调用注册的listenCallback函数。
if (bind_to_port) { listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
在listenCallback函数中,会调用onAccept函数,这个函数是ConnectionHandlerImpl::ActiveListener::onAccept函数。
listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);
4、Envoy从Pilot中获取Listener
当Listener的配置有变化的时候,会调用LdsApiImpl::onConfigUpdate(...)。
会调用ListenerManagerImpl的addOrUpdateListener(...)函数,在这里面,会建立一个ListenerImpl,
这里的listener是LDS定义的Listener,而非网络的Listener了。
在ListenerImpl的构造函数中,首先会建立ListenerFilter,能够对监听的socket进行定制化的配置,例如为了实现use_original_dst,就须要加入一个ListenerFilter。
为了建立ListenerFilter,先要调用ProdListenerComponentFactory::createListenerFilterFactoryList_来建立工厂。
建立完了ListenerFilter以后,为了对于进来的网络包进行处理,会建立NetworkFilter,正是这些NetworkFilter实现了对网络包的解析,并根据网络包的内容,实现路由和负载均衡策略。
为了建立NetworkFilter,先要调用ProdListenerComponentFactory::createNetworkFilterFactoryList_来建立工厂,在这个函数里面,会遍历全部的NamedNetworkFilterConfigFactory,也即起了名字的filter都过一遍,都有哪些呢?class NetworkFilterNameValues里面有定义,这里面最重要的是:
// HTTP connection manager filter const std::string HttpConnectionManager = "envoy.http_connection_manager";
咱们这里重点看HTTP协议的转发,咱们重点看这个名字对应的HttpConnectionManagerFilterConfigFactory,并调用他的createFilterFactory函数,返回一个Network::FilterFactoryCb类型的callback函数。
5、Envoy订阅RDS
HttpConnectionManagerFilterConfigFactory的createFilterFactory函数最终调用createFilterFactoryFromProtoTyped函数中,会建立
Router::RouteConfigProviderManagerImpl。
std::shared_ptr<Router::RouteConfigProviderManager> route_config_provider_manager = context.singletonManager().getTyped <Router::RouteConfigProviderManager>( SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] { return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin()); });
建立完毕Router::RouteConfigProviderManagerImpl以后,会建立HttpConnectionManagerConfig,将Router::RouteConfigProviderManagerImpl做为成员变量传入。
std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));
在HttpConnectionManagerConfig的构造函数中,调用Router::RouteConfigProviderUtil::create。
route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);
Router::RouteConfigProviderUtil::create会调用RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在这个函数里面,会建立RdsRouteConfigSubscription订阅RDS,而后建立RdsRouteConfigProviderImpl。
当Router的配置发生变化的时候,会调用RdsRouteConfigProviderImpl::onConfigUpdate(),在这个函数里面,会从pilot获取Route的配置,生成ConfigImpl对象。
当咱们仔细分析ConfigImpl的时候,咱们发现,这个数据结构和第一节Envoy中的路由配置是同样的。
在ConfigImpl里面有RouteMatcher用于匹配路由,在RouteMatcher里面有VirtualHostImpl,在VirtualHostImpl里面有RouteEntryImplBase,其中一种RouteEntry是WeightedClusterEntry,在WeightedClusterEntry里面有cluster_name_和cluster_weight_。
到此RouteConfigProviderManagerImpl如何订阅RDS告一段落,咱们回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在这个函数的最后一部分,将返回Network::FilterFactoryCb,是一个callback函数,做为createFilterFactoryFromProtoTyped的返回值。
在这个callback函数中,会建立Http::ConnectionManagerImpl,将HttpConnectionManagerConfig做为成员变量传入,Http::ConnectionManagerImpl是一个Network::ReadFilter。在Envoy里面,有Network::WriteFilter和Network::ReadFilter,其中从Downstream发送到Upstream的使用Network::ReadFilter对网络包进行处理,反方向的是使用Network::WriteFilter。
callback函数建立ConnectionManagerImpl以后调用filter_manager.addReadFilter,将ConnectionManagerImpl放到ReadFilter列表中。固然这个callback函数如今是不调用的。
createFilterFactoryFromProtoTyped的返回值callback函数会返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函数,这个函数返回一个callback函数列表,其中一个就是上面生成的这个函数。
再返回就回到了ListenerImpl的构造函数,他会调用addFilterChain,将ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函数列表加入到链里面。
6、当一个新的链接创建的时候
经过前面的分析,咱们制定当一个新的链接创建的时候,会触发libevent的事件,最终调用ConnectionHandlerImpl::ActiveListener::onAccept函数。
在这个函数中,会调用使用上面createListenerFilterFactoryList_建立的ListenerFilter的工厂建立ListenerFilter,而后使用这些Filter进行处理。
// Create and run the filters config_.filterChainFactory().createListenerFilterChain(*active_socket); active_socket->continueFilterChain(true);
ConnectionHandlerImpl::ActiveSocket::continueFilterChain函数调用ConnectionHandlerImpl::ActiveListener::newConnection,建立一个新的链接。
在ConnectionHandlerImpl::ActiveListener::newConnection函数中,先是会调用DispatcherImpl::createServerConnection,建立Network::ConnectionImpl,在Network::ConnectionImpl的构造函数里面:
file_event_ = dispatcher_.createFileEvent( fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
对于一个新的socket链接,对于操做系统来说是一个文件,也即有个文件描述符,当一个socket可读的时候,会触发一个事件,当一个socket可写的时候,能够触发另外一个事件,发生事件后,调用onFileEvent。
在ConnectionHandlerImpl::ActiveListener::newConnection函数中,接下来就应该为这个链接建立NetworkFilter了。
config_.filterChainFactory().createNetworkFilterChain( *new_connection, filter_chain->networkFilterFactories());
上面这段代码,会调用ListenerImpl::createNetworkFilterChain。
里面调用Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories) { for (const Network::FilterFactoryCb& factory : factories) { factory(filter_manager); } return filter_manager.initializeReadFilters(); }
能够看出这里会调用上面生成的callback函数,将ConnectionManagerImpl放到ReadFilter列表中。
7、当有新的数据到来的时候
当Downstream发送数据到Envoy的时候,socket就处于可读的状态,于是ConnectionImpl::onFileEvent函数会被调用,当事件是Event::FileReadyType::Read的时候,调用ConnectionImpl::onReadReady()。
在ConnectionImpl::onReadReady()函数里面,socket将数据读入缓存。
IoResult result = transport_socket_->doRead(read_buffer_);
而后调用ConnectionImpl::onRead,里面调用filter_manager_.onRead()使用NetworkFilter对于数据进行处理。
FilterManagerImpl::onRead() 会调用FilterManagerImpl::onContinueReading有下面的逻辑。
for (; entry != upstream_filters_.end(); entry++) { BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer(); if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) { FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); if (status == FilterStatus::StopIteration) { return; } } }
对于每个Filter,都调用onData函数,我们上面解析过,其中HTTP对应的ReadFilter是ConnectionManagerImpl,于是调用ConnectionManagerImpl::onData函数。
8、对数据进行解析
ConnectionManagerImpl::onData函数中,首先建立数据解析器。
codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);
调用HttpConnectionManagerConfig::createCodec,对于HTTP1,建立Http::Http1::ServerConnectionImpl。
而后对数据进行解析。
codec_->dispatch(data);
调用ConnectionImpl::dispatch,里面调用ConnectionImpl::dispatchSlice,在这里面对HTTP进行解析。
<font size="3">http_parser_execute(&parser_, &settings_, slice, len);</font>
解析的参数是settings_,能够看出这里面解析url,而后解析header,结束后调用onHeadersCompleteBase()函数,而后解析正文。
因为路由是根据HTTP头里面的信息来的,于是咱们重点看ConnectionImpl::onHeadersCompleteBase(),里面会调用ServerConnectionImpl::onHeadersComplete函数。
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
ServerConnectionImpl::onHeadersComplete里面调用ConnectionManagerImpl::ActiveStream::decodeHeaders这到了路由策略的重点。
这里面调用了两个函数,一个是refreshCachedRoute()刷新路由配置,并查找到匹配的路由项Route Entry。
另外一个是ConnectionManagerImpl::ActiveStream::decodeHeaders的另外一个实现。
decodeHeaders(nullptr, *request_headers_, end_stream);
在这里会经过Route Entry找到后端的集群,并创建链接。
9、路由匹配
咱们先来解析refreshCachedRoute()
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_); request_info_.route_entry_ = route ? route->routeEntry() : nullptr; cached_route_ = std::move(route); }
snapped_route_config_是什么呢?snapped_route_config_的初始化以下。
snapped_route_config_(connection_manager.config_.routeConfigProvider().config())
routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函数返回的就是ConfigImpl,也就是上面咱们描述过的层级的数据结构。
ConfigImpl的route函数以下
RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override { return route_matcher_->route(headers, random_value); }
RouteMatcher的route函数,根据headers.Host()查找virtualhost,而后调用 VirtualHostImpl::getRouteFromEntries。
RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const { const VirtualHostImpl* virtual_host = findVirtualHost(headers); if (virtual_host) { return virtual_host->getRouteFromEntries(headers, random_value); } else { return nullptr; } }
VirtualHostImpl::getRouteFromEntries函数里面有下面的循环
for (const RouteEntryImplBaseConstSharedPtr& route : routes_) { RouteConstSharedPtr route_entry = route->matches(headers, random_value); if (nullptr != route_entry) { return route_entry; } }
对于每个RouteEntry,看是否匹配。例如经常使用的有PrefixRouteEntryImpl::matches,看前缀是否一致。
RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const { if (RouteEntryImplBase::matchRoute(headers, random_value) && StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) { return clusterEntry(headers, random_value); } return nullptr; }
获得匹配的Route Entry后,调用RouteEntryImplBase::clusterEntry获取一个Cluster的名字。对于WeightedCluster,会调用下面的函数。
WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);
在这个函数里面,会根据权重选择一个WeightedClusterEntry返回。
这个时候,咱们获得了后面Cluster,也即集群的名称,接下来须要获得集群的具体的IP地址并创建链接。
10、查找集群并创建链接
在此我向你们推荐一个架构学习交流群。交流学习群号:821169538 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。
ConnectionManagerImpl::ActiveStream::decodeHeaders的另外一个实现会调用Route.cc里面的Filter::decodeHeaders。
在Filter::decodeHeaders函数中有如下的实现。
// A route entry matches for the request. route_entry_ = route_->routeEntry(); Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());
经过上一节的查找,WeightedClusterEntry里面有cluster的名称,接下来就是从cluster manager里面根据cluster名称查找到cluster的信息。
cluster manager就是咱们最初建立的ClusterManagerImpl
ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) { ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>(); auto entry = cluster_manager.thread_local_clusters_.find(cluster); if (entry != cluster_manager.thread_local_clusters_.end()) { return entry->second.get(); } else { return nullptr; } }
返回ClusterInfoImpl,是cluster的信息。
cluster_ = cluster->info();
找到Cluster后,开始创建链接。
// Fetch a connection pool for the upstream cluster. Http::ConnectionPool::Instance* conn_pool = getConnPool();
route.cc的Filter::getConnPool()里面调用如下函数。
config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);
ClusterManagerImpl::httpConnPoolForCluster调用ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函数。
在这个函数里面,HostConstSharedPtr host = lb_->chooseHost(context);经过负载均衡,在一个cluster里面选择一个后端的机器创建链接。