在Envoy的代码中Dispatcher
是随处可见的,能够说在Envoy中有着举足轻重的地位,一个Dispatcher
就是一个EventLoop,其承担了任务队列、网络事件处理、定时器、信号处理等核心功能。在Envoy threading model这篇文章所提到的EventLoop
(Each worker thread runs a “non-blocking” event loop
)指的就是这个Dispatcher
对象。这个部分的代码相对较独立,和其余模块耦合也比较少,但重要性却不言而喻。下面是与Dispatcher
相关的类图,在接下来会对其中的关键概念进行介绍。数据库
Dispatcher
本质上就是一个EventLoop,Envoy并无从新实现,而是复用了Libevent中的event_base
,在Libevent的基础上进行了二次封装并抽象出一些事件类,好比FileEvent
、SignalEvent
、Timer
等。Libevent是一个C库,而Envoy是C++,为了不手动管理这些C结构的内存,Envoy经过继承unique_ptr
的方式从新封装了这些libevent暴露出来的C结构。安全
template <class T, void (*deleter)(T*)> class CSmartPtr : public std::unique_ptr<T, void (*)(T*)> { public: CSmartPtr() : std::unique_ptr<T, void (*)(T*)>(nullptr, deleter) {} CSmartPtr(T* object) : std::unique_ptr<T, void (*)(T*)>(object, deleter) {} };
经过CSmartPtr
就能够将Libevent中的一些C数据结构的内存经过RAII机制自动管理起来,使用方式以下:服务器
extern "C" { void event_base_free(event_base*); } struct evbuffer; extern "C" { void evbuffer_free(evbuffer*); } ..... typedef CSmartPtr<event_base, event_base_free> BasePtr; typedef CSmartPtr<evbuffer, evbuffer_free> BufferPtr; typedef CSmartPtr<bufferevent, bufferevent_free> BufferEventPtr; typedef CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr;
在Libevent中不管是定时器到期、收到信号、仍是文件可读写等都是事件,统一使用event
类型来表示,Envoy中则将event
做为ImplBase
的成员,而后让全部的事件类型的对象都继承ImplBase
,从而实现了事件的抽象。网络
class ImplBase { protected: ~ImplBase(); event raw_event_; };
SignalEvent的实现很简单,经过evsignal_assign
来初始化事件,而后经过evsignal_add
添加事件使事件成为未决状态(关于Libevent事件状态见附录)。数据结构
class SignalEventImpl : public SignalEvent, ImplBase { public: // signal_num: 要设置的信号值 // cb: 信号事件的处理函数 SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb); private: SignalCb cb_; }; SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb) : cb_(cb) { evsignal_assign( &raw_event_, &dispatcher.base(), signal_num, [](evutil_socket_t, short, void* arg) -> void { static_cast<SignalEventImpl*>(arg)->cb_(); }, this); evsignal_add(&raw_event_, nullptr); }
Timer事件暴露了两个接口一个用于关闭Timer,另一个则用于启动Timer,须要传递一个时间来设置Timer的到期时间间隔。多线程
class Timer { public: virtual ~Timer() {} virtual void disableTimer() PURE; virtual void enableTimer(const std::chrono::milliseconds& d) PURE; };
建立Timer的时候会经过evtimer_assgin
对event进行初始化,这个时候事件还处于未决状态而不会触发,须要经过event_add
添加到Dispatcher
中才能被触发。socket
class TimerImpl : public Timer, ImplBase { public: TimerImpl(Libevent::BasePtr& libevent, TimerCb cb); // Timer void disableTimer() override; void enableTimer(const std::chrono::milliseconds& d) override; private: TimerCb cb_; }; TimerImpl::TimerImpl(DispatcherImpl& dispatcher, TimerCb cb) : cb_(cb) { ASSERT(cb_); evtimer_assign( &raw_event_, &dispatcher.base(), [](evutil_socket_t, short, void* arg) -> void { static_cast<TimerImpl*>(arg)->cb_(); }, this); }
disableTimer
被调用时其内部会调用event_del
来删除事件,使事件成为非未决状态,enableTimer
被调用时则间接调用event_add
使事件成为未决状态,这样一旦超时时间到了就会触发超时事件。ide
void TimerImpl::disableTimer() { event_del(&raw_event_); } void TimerImpl::enableTimer(const std::chrono::milliseconds& d) { if (d.count() == 0) { event_active(&raw_event_, EV_TIMEOUT, 0); } else { std::chrono::microseconds us = std::chrono::duration_cast<std::chrono::microseconds>(d); timeval tv; tv.tv_sec = us.count() / 1000000; tv.tv_usec = us.count() % 1000000; event_add(&raw_event_, &tv); } }
上面的代码在计算
timer
时间timeval
的时候实现的并不优雅,应该避免使用像1000000
这样的不具有可读性的数字常量,社区中有人建议能够改为以下的形式。函数
auto secs = std::chrono::duration_cast<std::chrono::seconds>(d); auto usecs = std::chrono::duration_cast<std::chrono::microseconds>(d - secs); tv.tv_secs = secs.count(); tv.tv_usecs = usecs.count();
socket
套接字相关的事件被封装为FileEvent
,其上暴露了二个接口:activate
用于主动触发事件,典型的使用场景好比: 唤醒EventLoop、Write Buffer有数据,能够主动触发下可写事件(Envoy中的典型使用场景)等;setEnabled
用于设置事件类型,将事件添加到EventLoop
中使其成为未决状态。oop
void FileEventImpl::activate(uint32_t events) { int libevent_events = 0; if (events & FileReadyType::Read) { libevent_events |= EV_READ; } if (events & FileReadyType::Write) { libevent_events |= EV_WRITE; } if (events & FileReadyType::Closed) { libevent_events |= EV_CLOSED; } ASSERT(libevent_events); event_active(&raw_event_, libevent_events, 0); } void FileEventImpl::setEnabled(uint32_t events) { event_del(&raw_event_); assignEvents(events); event_add(&raw_event_, nullptr); }
Dispatcher
的内部有一个任务队列,也会建立一个线程专们处理任务队列中的任务。经过Dispatcher
的post
方法能够将任务投递到任务队列中,交给Dispatcher
内的线程去处理。
void DispatcherImpl::post(std::function<void()> callback) { bool do_post; { Thread::LockGuard lock(post_lock_); do_post = post_callbacks_.empty(); post_callbacks_.push_back(callback); } if (do_post) { post_timer_->enableTimer(std::chrono::milliseconds(0)); } }
post
方法将传递进来的callback
所表明的任务,添加到post_callbacks_
所表明的类型为vector<callback>
的成员表变量中。若是post_callbacks_
为空的话,说明背后的处理线程是处于非活动状态,这时经过post_timer_
设置一个超时时间时间为0的方式来唤醒它。post_timer_
在构造的时候就已经设置好对应的callback
为runPostCallbacks
,对应代码以下:
DispatcherImpl::DispatcherImpl(TimeSystem& time_system, Buffer::WatermarkFactoryPtr&& factory) : ...... post_timer_(createTimer([this]() -> void { runPostCallbacks(); })), current_to_delete_(&to_delete_1_) { RELEASE_ASSERT(Libevent::Global::initialized(), ""); }
runPostCallbacks
是一个while循环,每次都从post_callbacks_
中取出一个callback
所表明的任务去运行,直到post_callbacks_
为空。每次运行runPostCallbacks
都会确保全部的任务都执行完。显然,在runPostCallbacks
被线程执行的期间若是post
进来了新的任务,那么新任务直接追加到post_callbacks_
尾部便可,而无需作唤醒线程这一动做。
void DispatcherImpl::runPostCallbacks() { while (true) { std::function<void()> callback; { Thread::LockGuard lock(post_lock_); if (post_callbacks_.empty()) { return; } callback = post_callbacks_.front(); post_callbacks_.pop_front(); } callback(); } }
最后讲一下Dispatcher
中比较难理解也很重要的DeferredDeletable
,它是一个空接口,全部要进行延迟析构的对象都要继承自这个空接口。在Envoy的代码中像下面这样继承自DeferredDeletable
的类随处可见。
class DeferredDeletable { public: virtual ~DeferredDeletable() {} };
那何为延迟析构呢?用在哪一个场景呢?延迟析构指的是将析构的动做交由Dispatcher
来完成,因此DeferredDeletable
和Dispatcher
密切相关。Dispatcher
对象有一个vector
保存了全部要延迟析构的对象。
class DispatcherImpl : public Dispatcher { ...... private: ........ std::vector<DeferredDeletablePtr> to_delete_1_; std::vector<DeferredDeletablePtr> to_delete_2_; std::vector<DeferredDeletablePtr>* current_to_delete_; }
to_delete_1_
和to_delete_2_
就是用来存放全部的要延迟析构的对象,这里使用两个vector
存放,为何要这样作呢?。current_to_delete_
始终指向当前正要析构的对象列表,每次执行完析构后就交替指向另一个对象列表,来回交替。
void DispatcherImpl::clearDeferredDeleteList() { ASSERT(isThreadSafe()); std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_; size_t num_to_delete = to_delete->size(); if (deferred_deleting_ || !num_to_delete) { return; } ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete); if (current_to_delete_ == &to_delete_1_) { current_to_delete_ = &to_delete_2_; } else { current_to_delete_ = &to_delete_1_; } deferred_deleting_ = true; for (size_t i = 0; i < num_to_delete; i++) { (*to_delete)[i].reset(); } to_delete->clear(); deferred_deleting_ = false; }
上面的代码在执行对象析构的时候先使用to_delete
来指向当前正要析构的对象列表,而后将current_to_delete_
指向另一个列表,这样在添加延迟删除的对象时,就能够作到安全的把对象添加到列表中了。由于deferredDelete
和clearDeferredDeleteList
都是在同一个线程中运行,因此current_to_delete_
是一个普通的指针,能够安全的更改指针指向另一个,而不用担忧有线程安全问题。
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) { ASSERT(isThreadSafe()); current_to_delete_->emplace_back(std::move(to_delete)); ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size()); if (1 == current_to_delete_->size()) { deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0)); } }
当有要进行延迟析构的对象时,调用deferredDelete
便可,这个函数内部会经过current_to_delete_
把对象放到要延迟析构的列表中,最后判断下当前要延迟析构的列表大小是不是1,若是是1代表这是第一次添加延迟析构的对象,那么就须要经过deferred_delete_timer_
把背后的线程唤醒执行clearDeferredDeleteList
函数。这样作的缘由是避免屡次唤醒,由于有一种状况是线程已经唤醒了正在执行clearDeferredDeleteList
,在这个过程当中又有其余的对象须要析构而加入到vector
中。
到此为止deferredDelete
的实现原理就基本分析完了,能够看出它的实现和任务队列的实现很相似,只不过一个是循环执行callback
所表明的任务,另外一个是对对象进行析构。最后咱们来看一下deferredDelete
的应用场景,却“为什么要进行延迟析构?”在Envoy的源代码中常常会看到像下面这样的代码片断。
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) { ...... } // 传递裸指针到回调中 file_event_ = dispatcher_.createFileEvent( fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); ...... }
传递给Dispatcher
的callback
都是经过裸指针的方式进行回调,若是进行回调的时候对象已经析构了,就会出现野指针的问题,我相信C++水平还能够的同窗都会看出这个问题,除非能在逻辑上保证Dispatcher
的生命周期比全部对象都短,这样就能保证在回调的时候对象确定不会析构,可是这不可能成立的,由于Dispatcher
是EventLoop
的核心。
一个线程运行一个EventLoop
直到线程结束,Dispatcher
对象才会析构,这意味着Dispatcher
对象的生命周期是最长的。因此从逻辑上没办法保证进行回调的时候对象没有析构。可能有人会有疑问,对象在析构的时候把注册的事件取消不就能够避免野指针的问题吗? 那若是事件已经触发了,callback
正在等待运行呢? 又或者callback
运行了一半呢?前者libevent是能够保证的,在调用event_del
的时候能够把处于等待运行的事件取消掉,可是后者就无能为力了,这个时候若是对象析构了,那行为就是未定义了。沿着这个思路想想,是否是只要保证对象析构的时候没有callback
正在运行就能够解决问题了呢?是的,只要保证全部在执行中的callback
执行完了,再作对象析构就能够了。能够利用Dispatcher
是顺序执行全部callback
的特色,向Dispatcher
中插入一个任务就是用来对象析构的,那么当这个任务执行的时候是能够保证没有其余任何callback
在运行。经过这个方法就完美解决了这里遇到的野指针问题了。
或许有人又会想,这里是否是能够用shared_ptr和shared_from_this来解这个呢? 是的,这是解决多线程环境下对象析构的秘密武器,经过延长对象的生命周期,把对象的生命周期延长到和callback
同样,等callback
执行完再进行析构,一样能够达到效果,可是这带来了两个问题,第一就是对象生命周期被无限拉长,虽然延迟析构也拉长了生命周期,可是时间是可预期的,一旦EventLoop
执行了clearDeferredDeleteList
任务就会马上被回收,而经过shared_ptr
的方式其生命周期取决于callback
什么时候运行,而callback
什么时候运行这个是没办法保证的,好比一个等待socket
的可读事件进行回调,若是对端一直不发送数据,那么callback
就一直不会被运行,对象就一直没法被析构,长时间累积会致使内存使用率上涨。第二就是在使用方式上侵入性较强,须要强制使用shared_ptr
的方式建立对象。
Dispatcher
总的来讲其实现仍是比较简单明了的,比较容易验证其正确性,一样功能也相对较弱,和chromium的MessageLoop
、boost的asio
都是类似的用途,可是功能上差得比较多。好在这是专门给Envoy设计的,并且Envoy的场景也比较单一,没必要作成那么通用的。另一个我以为比较奇怪的是,为何在DeferredDeletable
的实现中要用to_delete_1_
和to_delete_2_
两个队列交替来存放,其实按照个人理解一个队列便可,由于clearDeferredDeleteList
和deferredDelete
是保证在同一个线程中执行的,就和Dispatcher
的任务队列同样,用一个队列保存全部要执行的任务,循环的执行便可。可是Envoy中没有这样作,我理解这样设计的缘由多是由于相比于任务队列来讲延迟析构的重要性更低一些,大量对象的析构若是保存在一个队列中循环的进行析构势必会影响其余关键任务的执行,因此这里拆分红两个队列,多个任务交替的执行,就比如把一个大任务拆分红了好几个小任务顺序来执行。
双十一广告:阿里云双十一1折拼团活动:已满6人,都是最低折扣了
【满6人】1核2G云服务器99.5元一年298.5元三年 2核4G云服务器545元一年 1227元三年
【满6人】1核1G MySQL数据库 119.5元一年
【满6人】3000条国内短信包 60元每6月
参团地址:http://click.aliyun.com/m/1000020293/