muduo
的Reactor
模式主要有3个类实现-Channel、Poller、EventLoopweb
首先,咱们从简单的作起,一步一步构建思惟脑图,理解Muduo
核心结构编程
Channel
是selectable IO channel
,自始至终只负责一个 fd 的(注册与响应) IO 事件,可是不拥有该 fd ,因此也就在析构的时候不关闭它.数组
来来来,先喊三遍口号:安全
如何工做:网络
在Channel类中保存这IO事件的类型以及对应的回调函数,当IO事件发生时,最终会调用到Channel类中的回调函数多线程
具体流程以下:架构
首先给定Channel
所属的 loop
,及其要处理的 fd;接着注册 fd 上须要监听的事件,若是是经常使用的读写事件的话,能够直接调用接口函数enableReading
或enableWriting
来注册对应fd上的事件,disable*是销毁指定的事件;而后经过 setCallback
来设置事件发生时的回调便可app
注册事件时函数调用关系,以下:Channel::update()->EventLoop::updateChannel(Channel*)->Poller::updateChannel(Channel*)
,最终向 poll 系统调用的监听事件表注册或修改事件。框架
class EventLoop; /// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel { public: typedef std::function<void()> EventCallback; Channel(EventLoop *loop, int fd) : loop_(loop), fd_(fdArg), events_(0), revents_(0), index_(-1) { } //处理事件,通常由Poller经过EventLoop来调用 //eventloop中有一个vector<Channel *>的 activeChannels_ 活动数组,天然能够调用它 //根据revents_ 的事件类型,执行相应的回调 void handleEvent() { if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; } if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } } void setReadCallback(const EventCallback &cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback &cb) { writeCallback_ = cb; } void setErrorCallback(const EventCallback &cb) { errorCallback_ = cb; } int fd() const { return fd_; } /* 返回 fd 注册的事件 */ int events() const { return events_; } void set_revents(int revt) { revents_ = revt; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void enableWriting() { events_ |= kWriteEvent; update(); } // void disableWriting() { events_ &= ~kWriteEvent; update(); } // void disableAll() { events_ = kNoneEvent; update(); } // for Epoller int index() { return index_; } void set_index(int idx) { index_ = idx; } EventLoop *ownerLoop() { return loop_; } private: /* 经过调用loop_->updateChannel()来注册或改变本fd在epoll中监听的事件 */ void update() { loop_->updateChannel(this); } static const int kNoneEvent=0; static const int kReadEvent=POLLIN | POLLPRI; static const int kWriteEvent =POLLOUT; EventLoop *loop_; const int fd_; int events_; //关注的事件 int revents_; //poll/epoll中返回的事件 int index_; // used by epoller. 表示在poll的事件数组中的序号 EventCallback readCallback_; EventCallback writeCallback_; EventCallback errorCallback_; };
// 在咱们这里将其直接写为一个具体类socket
Poller是个基类,具体能够是EPollPoller(默认) 或者PollPoller,对应 poll 和 epoll.须要去实现(惟一使用面向对象的一个类)
这里咱们再来喊三遍口号:
具体处理流程就是:
poll
函数调用 epoll_wait/poll
来监听注册了的文件描述符,而后经过fillActiveChannels
函数将返回的就绪事件装入 activeChannels
数组
Poller,h
namespace muduo { class Channel; /// /// IO Multiplexing with poll(2). /// /// This class doesn't own the Channel objects. class Poller : boost::noncopyable { public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); //须要传入EventLoop Objetct ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. // 核心功能,调用 epoll_wait/poll 来监听注册了的文件描述符 /* Channel::update()->EventLoop::updateChannel(Channel* channel)->Poller::updateChannel(Channel* channel)*/ Timestamp poll(int timeoutMs, ChannelList* activeChannels); /// Changes the interested I/O events. /// Must be called in the loop thread. //负责维护和更新 pollfds_ 数组 void updateChannel(Channel* channel); /* 断言 确保没有跨线程 */ void assertInLoopThread() { ownerLoop_->assertInLoopThread(); } private: //真正填充 activeChannels 的函数 void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap;//key是文件描述符,value是Channel* EventLoop* ownerLoop_; PollFdList pollfds_; ChannelMap channels_; }; }
Poller,cc
Poller::Poller(EventLoop* loop) : ownerLoop_(loop) { } Poller::~Poller() { } Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs); Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "Poller::poll()"; } return now; } void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { if (pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = channels_.find(pfd->fd); assert(ch != channels_.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); // pfd->revents = 0; activeChannels->push_back(channel); } } } void Poller::updateChannel(Channel* channel) { assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); if (channel->index() < 0) { // muduo 使用了 index 去让channel 记住他在pollfds_的下标 // index < 0说明是一个新的通道 // a new one, add to pollfds_ assert(channels_.find(channel->fd()) == channels_.end()); struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); int idx = static_cast<int>(pollfds_.size())-1; channel->set_index(idx); channels_[pfd.fd] = channel; } else { // update existing one assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -1); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; // 将一个通道暂时更改成不关注事件,但不从Poller中移除该通道 if (channel->isNoneEvent()) { // ignore this pollfd pfd.fd = -1; } } }
EventLoop
类是Reactor
模式的核心,一个线程一个事件循环,即one loop per thread
,EventLoop
对象的生命周期一般与其所属的线程同样长。EventLoop对象构造的时候,会检查当前线程是否已经建立了其余EventLoop对象,若是已建立,终止程序(LOG_FATAL),EventLoop类的构造函数会记录本对象所属线程(threadld_),建立了EventLoop对象的线程称为IO线程.其主要功能是运行事件循环,等待事件发生,而后调用回调处理发生的事件。EventLoop::loop() -> Poller::poll()
填充就绪事件集合 activeChannels
,而后遍历该容器,执行每一个 channel
的 Channel::handleEvent()
完成对应就绪事件回调。
EventLoop.h
class Channel; class Poller; class EventLoop : boost::noncopyable { public: EventLoop(); // force out-line dtor, for scoped_ptr members. ~EventLoop(); /// /// Loops forever. /// 核心 /// Must be called in the same thread as creation of the object. ///**`EventLoop::loop() -> Poller::poll() `填充就绪事件集合 `activeChannels`, //而后遍历该容器,执行每一个 `channel `的 `Channel::handleEvent()` // 完成对应就绪事件回调。** void loop(); void quit(); // internal use only void updateChannel(Channel* channel); // void removeChannel(Channel* channel); void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void abortNotInLoopThread(); typedef std::vector<Channel*> ChannelList; bool looping_; /* atomic */ bool quit_; /* atomic */ const pid_t threadId_; //线程ID boost::scoped_ptr<Poller> poller_; ChannelList activeChannels_; }; }
using namespace muduo; __thread EventLoop* t_loopInThisThread = 0; const int kPollTimeMs = 10000; EventLoop::EventLoop() : looping_(false), quit_(false), threadId_(CurrentThread::tid()), poller_(new Poller(this)) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { assert(!looping_); t_loopInThisThread = NULL; } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); poller_->poll(kPollTimeMs, &activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } void EventLoop::quit() { quit_ = true; // wakeup(); } void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); }
使用以上三个类.来看一个简单的例子:
#include "Channel.h" #include "EventLoop.h" #include <stdio.h> #include <sys/timerfd.h> muduo::EventLoop *g_loop; void timeout() { printf("Timeout!\n"); g_loop->quit(); } int main() { muduo::EventLoop loop; g_loop = &loop; int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);//建立一个新的定时器对象 muduo::Channel channel(&loop, timerfd); channel.setReadCallback(timeout); channel.enableReading(); struct itimerspec howlong; bzero(&howlong, sizeof howlong); howlong.it_value.tv_sec = 5; ::timerfd_settime(timerfd, 0, &howlong, NULL); loop.loop(); ::close(timerfd); }
程序利用timerfd
实现一个单词触发的定时器.channel
将该timerfd
上的可读事件转发给了timerout函数
如下摘自陈硕的博客:
Linux 的计时函数,用于得到当前时间:
定时函数,用于让程序等待一段时间或安排计划任务: sleep alarm usleep nanosleep clock_nanosleep getitimer / setitimer timer_create / timer_settime / timer_gettime / timer_delete timerfd_create / timerfd_gettime / timerfd_settime
个人取舍以下:
PS:C++的time_t
是 int32_t
表示秒数,时间不许.
timerfd_* 入选的缘由:
sleep / alarm / usleep 在实现时有可能用了信号 SIGALRM,在多线程程序中处理信号是个至关麻烦的事情,应当尽可能避免。(近期我会写一篇博客仔细讲讲“多线程、RAII、fork() 与信号”)
nanosleep 和 clock_nanosleep 是线程安全的,可是在非阻塞网络编程中,绝对不能用让线程挂起的方式来等待一段时间,程序会失去响应。正确的作法是注册一个时间回调函数。
getitimer 和 timer_create 也是用信号来 deliver 超时,在多线程程序中也会有麻烦。timer_create 能够指定信号的接收方是进程仍是线程,算是一个进步,不过在信号处理函数(signal handler)能作的事情实在很受限。
timerfd_create
把时间变成了一个文件描述符,该“文件”在定时器超时的那一刻变得可读,这样就能很方便地融入到 select/poll
框架中,用统一的方式来处理 IO 事件和超时事件,这也正是 Reactor
模式的长处。
传统的 Reactor 利用 select/poll/epoll 的 timeout 来实现定时功能,但 poll 和 epoll 的定时精度只有毫秒,远低于 timerfd_settime 的定时精度。
惟一标识一个 Timer 定时器。TimerId Class 同时保存Timer* 和 sequence_,这个 sequence_ 是每一个 Timer 对象有一个全局递增的序列号 int64_t sequence_,用原子计数器(AtomicInt64)生成
它主要用于注销定时器,int64_t sequence_ 能够区分地址相同的前后两个 Timer 对象。下面代码先忽略int64_t sequence_,也就是先不实现cancel 的接口,咱们在这里只理解其工做方式便可
TimreId 类
namespace muduo { class Timer; /// /// An opaque identifier, for canceling Timer. /// class TimerId : public muduo::copyable { public: explicit TimerId(Timer *timer) : value_(timer) { } // default copy-ctor, dtor and assignment are okay private: Timer *value_; };
封装了定时器的一些参数,包括超时时间(expiration_)、超时回调函数(callback_)、时间间隔(interval_)、是否重复定时(repeat_)、定时器的序列号等成员变量,成员函数大都是返回这些变量的值,run() 用来调用回调函数,restart() 用来重启定时器。
Timer.h
namespace muduo { /// /// Internal class for timer event. /// class Timer : boost::noncopyable { public: Timer(const TimerCallback& cb, Timestamp when, double interval) : callback_(cb), expiration_(when), interval_(interval), repeat_(interval > 0.0) { } void run() const { callback_();//执行定时器回调函数 } Timestamp expiration() const { return expiration_; } /* 是否周期性定时 */ bool repeat() const { return repeat_; } /* 重启定时器 */ void restart(Timestamp now) { if (repeat_) { //若是须要重复,那就将时间设为下次超时的时间 expiration_ = addTime(now, interval_); } else { //若是不须要重复,那就将超时时间设为一个不可用的 value expiration_ = Timestamp::invalid(); } } private: const TimerCallback callback_; //回调函数 Timestamp expiration_; //时间戳 const double interval_;//时间间隔,若是是一次性定时器,该值为0 const bool repeat_;//是否重复执行 }; }
这里muduo使用的是下面的结构去管理的定时器的
typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList;
他说是multimap
应为不经常使用被抛弃了,唉,对于这点,也是醉了
TimerQueue 定时器容器类中须要处理两个 Timer 的超时时间相同的问题,因此能够用multimap
经过给 timerfd
一个超时时间实现超时计时,它内部有 Channel
,经过 Channel
管理 timerfd
,而后向EventLoop
和Poller
注册 timerfd
的可读事件,当 timerfd
的可读事件就绪时代表一个超时时间点到了,而后Channle对象timerfdChannel_
调用可读事件回调 handleRead()
,经过 getExpired()
找出全部的超时事件,而后执行相应的超时回调函数 Timer::run()
。为了复用定时器,每次处理完以后,会检查这些超时定时器是否须要重复定时,若是须要重复,就再次添加到定时器集合中。
timerfd 如何实现多个定时器超时计时的呢?就是在插入的时候与set 元素比较,而后更新timerfd,从而保证 timerfd 始终是 set 中最近的一个超时时间.当 timerfd 可读时,仍是须要遍历容器,由于有可能此时有多个 Timer 超时了(尽管 tiemrfd 是当前最小的超时时间).唉,何须这么麻烦呐,直接用时间堆管理很差吗?timerfd == 堆顶,不过,我学到的是仍是须要遍历容器(堆)的
TimerQueue.h
namespace muduo { class EventLoop; class Timer; class TimerId; /// /// A best efforts timer queue. /// No guarantee that the callback will be on time. /// class TimerQueue : boost::noncopyable { public: TimerQueue(EventLoop* loop); ~TimerQueue(); /// /// Schedules the callback to be run at given time, /// repeats if @c interval > 0.0. /// /// Must be thread safe. Usually be called from other threads. TimerId addTimer(const TimerCallback& cb, Timestamp when, double interval); // void cancel(TimerId timerId); private: // FIXME: use unique_ptr<Timer> instead of raw pointers. typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; void addTimerInLoop(Timer* timer); // called when timerfd alarms void handleRead(); // move out all expired timers std::vector<Entry> getExpired(Timestamp now); void reset(const std::vector<Entry>& expired, Timestamp now); bool insert(Timer* timer); EventLoop* loop_; const int timerfd_; Channel timerfdChannel_; // Timer list sorted by expiration TimerList timers_; }; } #endif // MUDUO_NET_TIMERQUEUE_H
EventLoop加入三个函数:runAt()、runAfter()、runEvery()。通通转而调用TimerQueue::addTimer
muduo中有一个EventLoop::runInLoop
函数,用来在其余线程中唤醒IO线程(就是建立并运行了EventLoop的线程),可是在协程里面应该是用不到,因此暂时不接触这一点
EventLoop.h
#include "datetime/Timestamp.h" #include "thread/Thread.h" #include "Callbacks.h" #include "TimerId.h" #include <boost/scoped_ptr.hpp> #include <vector> namespace muduo { class Channel; class Poller; class TimerQueue; class EventLoop : boost::noncopyable { public: EventLoop(); // force out-line dtor, for scoped_ptr members. ~EventLoop(); /// /// Loops forever. /// /// Must be called in the same thread as creation of the object. /// void loop(); void quit(); /// /// Time when poll returns, usually means data arrivial. /// Timestamp pollReturnTime() const { return pollReturnTime_; } // timers /// /// Runs callback at 'time'. /// TimerId runAt(const Timestamp& time, const TimerCallback& cb); /// /// Runs callback after @c delay seconds. /// TimerId runAfter(double delay, const TimerCallback& cb); /// /// Runs callback every @c interval seconds. /// TimerId runEvery(double interval, const TimerCallback& cb); // void cancel(TimerId timerId); // internal use only void updateChannel(Channel* channel); // void removeChannel(Channel* channel); void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void abortNotInLoopThread(); typedef std::vector<Channel*> ChannelList; bool looping_; /* atomic */ bool quit_; /* atomic */ const pid_t threadId_; Timestamp pollReturnTime_; boost::scoped_ptr<Poller> poller_; boost::scoped_ptr<TimerQueue> timerQueue_; ChannelList activeChannels_; }; } #endif // MUDUO_NET_EVENTLOOP_H
using namespace muduo; __thread EventLoop* t_loopInThisThread = 0; const int kPollTimeMs = 10000; EventLoop::EventLoop() : looping_(false), quit_(false), threadId_(CurrentThread::tid()), poller_(new Poller(this)), timerQueue_(new TimerQueue(this)) { LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } } EventLoop::~EventLoop() { assert(!looping_); t_loopInThisThread = NULL; } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { (*it)->handleEvent(); } } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } void EventLoop::quit() { quit_ = true; // wakeup(); } TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb) { return timerQueue_->addTimer(cb, time, 0.0); } TimerId EventLoop::runAfter(double delay, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, cb); } TimerId EventLoop::runEvery(double interval, const TimerCallback& cb) { Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(cb, time, interval); } void EventLoop::updateChannel(Channel* channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); }