muduo的TimerQueue是基于timerfd_create实现,这样超时很容易和epoll结合起来。等待超时事件保存在set集合中,注意set集合的有序性,从小到大排列,整个对TimerQueue的处理也就是对set集合的操做。实现TimerQueue用了3个set,分别是等待超时事件set,活跃事件set,被撤销定时set。主要是STL的一些操做。算法
TimerQueue.h数组
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is an internal header file, you should not include this. #ifndef MUDUO_NET_TIMERQUEUE_H #define MUDUO_NET_TIMERQUEUE_H #include <set> #include <vector> #include "muduo/base/Mutex.h" #include "muduo/base/Timestamp.h" #include "muduo/net/Callbacks.h" #include "muduo/net/Channel.h" namespace muduo { namespace net { class EventLoop; class Timer; class TimerId; /// /// A best efforts timer queue. /// No guarantee that the callback will be on time. /// class TimerQueue : noncopyable { public: explicit TimerQueue(EventLoop* loop); ~TimerQueue(); /// /// Schedules the callback to be run at given time, /// repeats if @c interval > 0.0. /// /// Must be thread safe. Usually be called from other threads. /* * 用于注册定时任务 * @param cb, 超时调用的回调函数 * @param when,超时时间(绝对时间) * @interval,是不是周期性超时任务 */ TimerId addTimer(TimerCallback cb, Timestamp when, double interval); /* 取消定时任务,每一个定时任务都有对应的TimerId,这是addTimer返回给调用者的 */ void cancel(TimerId timerId); private: // FIXME: use unique_ptr<Timer> instead of raw pointers. // This requires heterogeneous comparison lookup (N3465) from C++14 // so that we can find an T* in a set<unique_ptr<T>>. /* * 主要用于删除操做,经过TimerId找到Timer*,再经过Timer*找到在timers_中的位置,将期删除 * 以为能够省略 */ typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; typedef std::pair<Timer*, int64_t> ActiveTimer; typedef std::set<ActiveTimer> ActiveTimerSet; void addTimerInLoop(Timer* timer); void cancelInLoop(TimerId timerId); // called when timerfd alarms /* 当timerfd被激活时调用的回调函数,表示超时 */ void handleRead(); // move out all expired timers /* 从timers_中拿出全部超时的Timer* */ std::vector<Entry> getExpired(Timestamp now); /* 将超时任务中周期性的任务从新添加到timers_中 */ void reset(const std::vector<Entry>& expired, Timestamp now); /* 插入到timers_中 */ bool insert(Timer* timer); EventLoop* loop_; /* 所属的事件驱动循环 */ const int timerfd_; /* 由timerfd_create建立的文件描述符 */ Channel timerfdChannel_; /* 用于监听timerfd的Channel */ // Timer list sorted by expiration TimerList timers_; /* 保存全部的定时任务 */ // for cancel() ActiveTimerSet activeTimers_; bool callingExpiredTimers_; /* atomic */ ActiveTimerSet cancelingTimers_; //保存被取消的定时器 }; } // namespace net } // namespace muduo #endif // MUDUO_NET_TIMERQUEUE_H
TimerQueue.ccide
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #ifndef __STDC_LIMIT_MACROS #define __STDC_LIMIT_MACROS #endif #include "muduo/net/TimerQueue.h" #include "muduo/base/Logging.h" #include "muduo/net/EventLoop.h" #include "muduo/net/Timer.h" #include "muduo/net/TimerId.h" #include <sys/timerfd.h> #include <unistd.h> namespace muduo { namespace net { namespace detail { int createTimerfd() { int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { LOG_SYSFATAL << "Failed in timerfd_create"; } return timerfd; } struct timespec howMuchTimeFromNow(Timestamp when)//如今距离超时时间when还有多久 { int64_t microseconds = when.microSecondsSinceEpoch() - Timestamp::now().microSecondsSinceEpoch(); if (microseconds < 100) { microseconds = 100; } struct timespec ts; ts.tv_sec = static_cast<time_t>( microseconds / Timestamp::kMicroSecondsPerSecond); ts.tv_nsec = static_cast<long>( (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000); return ts; } void readTimerfd(int timerfd, Timestamp now)//处理超时事件。超时后,timerfd变为可读 { uint64_t howmany; ssize_t n = ::read(timerfd, &howmany, sizeof howmany); LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString(); if (n != sizeof howmany) { LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8"; } } //timerfd是时间对于的文件描述符 void resetTimerfd(int timerfd, Timestamp expiration)//从新设置定时器 { // wake up loop by timerfd_settime() struct itimerspec newValue; struct itimerspec oldValue; memZero(&newValue, sizeof newValue); memZero(&oldValue, sizeof oldValue); newValue.it_value = howMuchTimeFromNow(expiration); int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); if (ret) { LOG_SYSERR << "timerfd_settime()"; } } } // namespace detail } // namespace net } // namespace muduo using namespace muduo; using namespace muduo::net; using namespace muduo::net::detail; TimerQueue::TimerQueue(EventLoop* loop) : loop_(loop), timerfd_(createTimerfd()), timerfdChannel_(loop, timerfd_), timers_(), callingExpiredTimers_(false) { timerfdChannel_.setReadCallback( std::bind(&TimerQueue::handleRead, this)); // we are always reading the timerfd, we disarm it with timerfd_settime. timerfdChannel_.enableReading(); } TimerQueue::~TimerQueue() { timerfdChannel_.disableAll(); timerfdChannel_.remove(); ::close(timerfd_); // do not remove channel, since we're in EventLoop::dtor(); for (const Entry& timer : timers_) { delete timer.second; } } TimerId TimerQueue::addTimer(TimerCallback cb, Timestamp when, double interval) { Timer* timer = new Timer(std::move(cb), when, interval); /* * 在本身所属线程调用addTimerInLoop函数 */ loop_->runInLoop( std::bind(&TimerQueue::addTimerInLoop, this, timer)); return TimerId(timer, timer->sequence()); } void TimerQueue::cancel(TimerId timerId) { loop_->runInLoop( std::bind(&TimerQueue::cancelInLoop, this, timerId)); } /* 向计时器队列中添加超时事件 */ void TimerQueue::addTimerInLoop(Timer* timer) { loop_->assertInLoopThread(); bool earliestChanged = insert(timer);//返回true,说明timer被添加到set的顶部,做为新的根节点,须要更新timerfd的激活时间 // 只有在计时器为空的时候或者新加入的计时器的最先触发时间小于当前计时器的堆顶的最小值 // 才须要用最近时间去更新 if (earliestChanged) { resetTimerfd(timerfd_, timer->expiration()); } } void TimerQueue::cancelInLoop(TimerId timerId) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); ActiveTimer timer(timerId.timer_, timerId.sequence_); ActiveTimerSet::iterator it = activeTimers_.find(timer); if (it != activeTimers_.end())//要取消的在当前激活的Timer集合中 { size_t n = timers_.erase(Entry(it->first->expiration(), it->first));//在timers_中取消 assert(n == 1); (void)n; delete it->first; // FIXME: no delete please activeTimers_.erase(it);//在activeTimers_中取消 } else if (callingExpiredTimers_)//若是正在执行超时定时器的回调函数,则加入到cancelingTimers集合中 { cancelingTimers_.insert(timer); } assert(timers_.size() == activeTimers_.size()); } /* * 当定时器超时,保存timerfd的Channel激活,调用回调函数 */ void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now); /* 从定时任务set中拿出全部超时任务 */ std::vector<Entry> expired = getExpired(now); callingExpiredTimers_ = true; cancelingTimers_.clear(); // safe to callback outside critical section /* 调用超时的事件回调函数 */ for (const Entry& it : expired) { it.second->run(); } callingExpiredTimers_ = false; reset(expired, now); } /* * 从新整理时间set中的任务,将全部超时的任务都拿出,而后调用其回调函数 */ std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now) { assert(timers_.size() == activeTimers_.size()); std::vector<Entry> expired; Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); //返回第一个大于等于now的迭代器,小于now的都已经超时 //lower_bound( begin,end,num):从数组的begin位置到end-1位置二分查找第一个大于或等于num的数字,找到返回该数字的地址 //lower_bound(val):返回容器中第一个值【大于或等于】val的元素的iterator位置 TimerList::iterator end = timers_.lower_bound(sentry); assert(end == timers_.end() || now < end->first); /* back_inserter:容器适配器,将数据插入到参数的尾部 */ //一个序列(sequence)拷贝到一个容器(container)中去,一般用std::copy算法 std::copy(timers_.begin(), end, back_inserter(expired)); timers_.erase(timers_.begin(), end); //从timers_中移除 for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); size_t n = activeTimers_.erase(timer); //从activeTimers_中移除 assert(n == 1); (void)n; } assert(timers_.size() == activeTimers_.size()); return expired; } /* * 调用完全部超时的回调函数后,须要对这些超时任务进行整理 * 将周期性的定时任务从新添加到set中 */ void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now) { Timestamp nextExpire; for (const Entry& it : expired) { ActiveTimer timer(it.second, it.second->sequence()); if (it.second->repeat() /* 是不是周期性的定时任务 */ && cancelingTimers_.find(timer) == cancelingTimers_.end()) { /* 从新计算超时时间 */ it.second->restart(now); /* 从新添加到set中 */ insert(it.second); } else { // FIXME move to a free list delete it.second; // FIXME: no delete please } } /* 计算下次timerfd被激活的时间 */ if (!timers_.empty()) { nextExpire = timers_.begin()->second->expiration();//set从小到大排序 } /* 设置 */ if (nextExpire.valid())//时间是有效的 { resetTimerfd(timerfd_, nextExpire); } } bool TimerQueue::insert(Timer* timer) { loop_->assertInLoopThread(); assert(timers_.size() == activeTimers_.size()); bool earliestChanged = false; /* 获取timer的UTC时间戳,和timer组成std::pair<Timestamp, Timer*> */ Timestamp when = timer->expiration(); /* timers_begin()是set顶层元素(红黑树根节点),是超时时间最近的Timer* */ TimerList::iterator it = timers_.begin(); /* 若是要添加的timer的超时时间比timers_中的超时时间近,更改新的超时时间 */ if (it == timers_.end() || when < it->first) { earliestChanged = true; } { /* 添加到定时任务的set中 */ std::pair<TimerList::iterator, bool> result = timers_.insert(Entry(when, timer)); assert(result.second); (void)result; } { /* 同时也添加到activeTimers_中,用于删除时查找操做 */ std::pair<ActiveTimerSet::iterator, bool> result = activeTimers_.insert(ActiveTimer(timer, timer->sequence())); assert(result.second); (void)result; } assert(timers_.size() == activeTimers_.size()); return earliestChanged; }