muduo是由陈硕(http://www.cnblogs.com/Solstice)开发的一个Linux多线程网络库,采用了不少新的Linux特性(例如eventfd、timerfd)和GCC内置函数。其主要特色为: react
调用了以下GCC提供的原子操做内建函数: 安全
线程安全的队列,内部实现为std::deque<T> 性能优化
与BlockingQueue相似,可是内部容器基于boost::circular_buffer<T> 服务器
pthread_cond的封装 网络
CountDownLatch,相似发令枪,对condition的再包装,能够保证全部线程同时启动。 多线程
backtrace_symbols和backtrace的包装类 app
MutexLock:pthread_mutex_*的包装类 socket
void* Thread::startThread(void* obj) ide { 函数 Thread* thread = static_cast<Thread*>(obj); thread->runInThread();//对func_的包装,调用了func_ return NULL; }
void Thread::runInThread() { tid_ = CurrentThread::tid(); muduo::CurrentThread::t_threadName = name_.c_str(); try { func_(); muduo::CurrentThread::t_threadName = "finished"; } … }
typedef boost::function<void ()> ThreadFunc; Thread::Thread(const ThreadFunc& func, const string& n): started_(false), pthreadId_(0), tid_(0), //func_是实际上要在线程里执行的函数,以boost::function生成了一个函数对象 (functor) func_(func), name_(n) { numCreated_.increment(); } |
依旧用pthread_get/setspecific(OP:为什么不用__thread关键字?)。
线程单例模式,单例模板类的instance成员采用__thread关键字修饰,具备TLS属性。
void ThreadPool::run(const Task& task) { //若是没有线程,直接执行task定义的函数 if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); //加入任务队列 queue_.push_back(task); cond_.notify(); } }
ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty() && running_) { //若是没有任务,则等待 cond_.wait(); } Task task; if(!queue_.empty()) { task = queue_.front(); queue_.pop_front(); } return task; }
//此函数就是线程函数 void ThreadPool::runInThread() { try { while (running_) { //每一个线程都从这里获取任务 Task task(take()); if (task) { //执行任务 task(); } } } … } |
若是writable < datalen,可是prependable+writeable >= datalen,则将readIndex挪至最前,将prependable+writeable合并获得一个足够大的缓冲区(通常来讲,这种状况是因为还有还没有读取的数据,readIndex向后移动位置形成的);若是prependable+writeable < datalen,说明所有可写区域之和也不足,则vertor::resize()扩展缓冲区。
void makeSpace(size_t len) { if (writableBytes() + prependableBytes() < len + kCheapPrepend) { // FIXME: move readable data buffer_.resize(writerIndex_+len); } else { // move readable data to the front, make space inside buffer assert(kCheapPrepend < readerIndex_); size_t readable = readableBytes(); std::copy(begin()+readerIndex_, begin()+writerIndex_, begin()+kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; assert(readable == readableBytes()); } } |
class Channel : boost::noncopyable { public: typedef boost::function<void()> EventCallback; typedef boost::function<void(Timestamp)> ReadEventCallback; private: EventLoop* loop_; //属于哪一个reactor const int fd_; //关联的FD int events_; //关注事件 int revents_; //ready事件 bool eventHandling_; //当前正在处理事件 ReadEventCallback readCallback_; EventCallback writeCallback_; //如何写数据 EventCallback closeCallback_; //如何关闭连接 EventCallback errorCallback_; //如何处理错误 }; |
若是loop有事件发生,将触发handleEvent回调:
void Channel::handleEventWithGuard(Timestamp receiveTime) { eventHandling_ = true; if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) { if (logHup_) { LOG_WARN << "Channel::handle_event() POLLHUP"; } if (closeCallback_) closeCallback_(); }
if (revents_ & POLLNVAL) { LOG_WARN << "Channel::handle_event() POLLNVAL"; }
if (revents_ & (POLLERR | POLLNVAL)) { if (errorCallback_) errorCallback_(); } if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) { if (readCallback_) readCallback_(receiveTime); } if (revents_ & POLLOUT) { if (writeCallback_) writeCallback_(); } eventHandling_ = false; } |
class EventLoop : boost::noncopyable { public: void loop(); void quit();
/// Runs callback immediately in the loop thread. /// It wakes up the loop, and run the cb. /// If in the same loop thread, cb is run within the function. /// Safe to call from other threads. void runInLoop(const Functor& cb);
/// Queues callback in the loop thread. /// Runs after finish pooling. /// Safe to call from other threads. void queueInLoop(const Functor& cb);
/// Runs callback at 'time'. /// Safe to call from other threads. TimerId runAt(const Timestamp& time, const TimerCallback& cb);
/// Runs callback after @c delay seconds. /// Safe to call from other threads. TimerId runAfter(double delay, const TimerCallback& cb);
/// Runs callback every @c interval seconds. /// Safe to call from other threads. TimerId runEvery(double interval, const TimerCallback& cb);
/// Cancels the timer. /// Safe to call from other threads. void cancel(TimerId timerId);
// internal usage void wakeup(); void updateChannel(Channel* channel); void removeChannel(Channel* channel); bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } private: void handleRead(); // waked up void doPendingFunctors(); typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic */ bool quit_; /* atomic */ bool eventHandling_; /* atomic */ bool callingPendingFunctors_; /* atomic */ const pid_t threadId_; Timestamp pollReturnTime_; boost::scoped_ptr<Poller> poller_; boost::scoped_ptr<TimerQueue> timerQueue_; int wakeupFd_; // unlike in TimerQueue, which is an internal class, // we don't expose Channel to client. boost::scoped_ptr<Channel> wakeupChannel_; ChannelList activeChannels_; Channel* currentActiveChannel_; MutexLock mutex_; std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_ };
__thread EventLoop* t_loopInThisThread = 0; |
t_loopInThisThread被定义为per thread的全局变量,并在EventLoop的构造函数中初始化:
epoll默认工做方式是LT。
从这个muduo的工做模型来看,能够采用an IO thread per fd的形式处理各connection的读/写/encode/decode等工做,计算线程池中的线程在一个eventfd上监听,激活后就将connection做为参数与decoded packet一块儿传递到计算线程池中,并在计算完成后将结果直接写入IO thread的fd。并采用round-robin的方式选出下一个计算线程。
不一样的解决方案:实际上这些线程是能够归并的,仅仅取决于任务的性质:IO密集型或是计算密集型。限制仅仅在于:出于避免过多thread context切换形成性能降低和资源对thread数量的约束,不能采用a thread per fd的模型,而是将fd分为若干组比较均衡的分配到IO线程中。
EventLoop的跨线程激活:
EventLoop::EventLoop() : wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)) { wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // 绑定到handleRead上面了 // we are always reading the wakeupfd wakeupChannel_->enableReading(); } |
跨线程激活的函数是wakeUp:
void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); // 相似于管道直接写 } |
一旦wakeup完成以后那么wakeUpFd_就是可读的,这样EventLoop就会被通知到而且马上跳出epoll_wait开始处理。固然咱们须要将这个wakeupFd_ 上面数据读出来,否则的话下一次又会被通知到,读取函数就是handleRead:
void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); } |
runInLoop和queueInLoop就是跨线程任务。
void EventLoop::runInLoop(const Functor& cb){ //若是这个函数在本身的线程调用,那么就能够当即执行 if (isInLoopThread()){ cb(); }else{ //若是是其余线程调用,那么加入到pendingFunctors里面去 queueInLoop(cb); //而且通知这个线程,有任务到来 wakeup(); } }
void EventLoop::queueInLoop(const Functor& cb){ { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } /*被排上队以后若是是在本身线程而且正在执行pendingFunctors的话,那么就能够激活 不然下一轮彻底能够被排上,因此没有必要激活*/ if (isInLoopThread() && callingPendingFunctors_){ wakeup(); } } |
调用栈:
addTimer(const TimerCallback& cb,Timestamp when, double interval) => addTimerInLoop(Timer* timer) =>insert(timer)中:
typedef std::pair<Timestamp, Timer*> Entry; typedef std::set<Entry> TimerList; bool earliestChanged = false; Timestamp when = timer->expiration(); TimerList::iterator it = timers_.begin(); if (it == timers_.end() || when < it->first) { earliestChanged = true; } |
这里的微妙之处在于:若是是第一个定时器,begin()=end(),那么earliestChanged = true;会触发resetTimerfd:
void TimerQueue::addTimerInLoop(Timer* timer) { loop_->assertInLoopThread(); bool earliestChanged = insert(timer);
if (earliestChanged) { //调用::timerfd_settime(timerfd, 0, &newValue, &oldValue)启动定时器 resetTimerfd(timerfd_, timer->expiration()); } } |
当定时器触发后:
void TimerQueue::handleRead() { loop_->assertInLoopThread(); Timestamp now(Timestamp::now()); readTimerfd(timerfd_, now); //咱们能够知道有哪些计时器超时 std::vector<Entry> expired = getExpired(now); // safe to callback outside critical section for (std::vector<Entry>::iterator it = expired.begin(); it != expired.end(); ++it) { //对于这些超时的Timer,执行run()函数,对应也就是咱们一开始注册的回调函数 it->second->run(); } reset(expired, now); } |
TcpConnection完成的工做就是当TCP链接创建以后处理socket的读写以及关闭。一样咱们看看TcpConnection的结构
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop* loop, // 创建链接须要一个Reactor const string& name, // 链接名称 int sockfd, // 链接fd const InetAddress& localAddr, // 本地IP@ const InetAddress& peerAddr); //对端IP@ // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void sendInLoop(const void* message, size_t len); // 发送消息 void setState(StateE s) { state_ = s; }
EventLoop* loop_; string name_; StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; // socket. boost::scoped_ptr<Channel> channel_; // 链接channel InetAddress localAddr_; InetAddress peerAddr_; ConnectionCallback connectionCallback_; // 链接回调,这个触发包括在链接创建和断开都会触发 MessageCallback messageCallback_; // 有数据可读的回调 WriteCompleteCallback writeCompleteCallback_; // 写完毕的回调 CloseCallback closeCallback_; // 链接关闭回调 Buffer inputBuffer_; // 数据读取buffer. Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer. boost::any context_; // 上下文环境 // FIXME: creationTime_, lastReceiveTime_ // bytesReceived_, bytesSent_ }; |
首先TcpConnection在初始化的时候会创建好channel。而后一旦TcpClient或者是TcpServer创建链接以后的话,那么调用TcpConnection::connectEstablished。这个函数内部的话就会将channel设置成为可读。一旦可读的话那么TcpConnection内部就会调用handleRead这个动做,内部托管了读取数据这个操做。 读取完毕以后而后交给MessageBack这个回调进行操做。若是须要写的话调用sendInLoop,那么会将message放在outputBuffer里面,而且设置可写。当可写的话TcpConnection内部就托管写,而后写完以后的话会发生writeCompleteCallback这个回调。托管的读写操做都是非阻塞的。若是但愿断开的话调用 shutdown。解除这个链接的话那么能够调用TcpConnection::connectDestroyed,内部大体操做就是从reactor移除这个channel。
在TcpConnection这层并不知道一次须要读取多少个字节,这个是在上层进行消息拆分的。TcpConnection一次最多读取64K字节的内容,而后交给Upper App。后者决定这些内容是否足够,若是不够的话那么直接返回让Reactor继续等待读。 一样写的话内部也是会分屡次写。这样就要求reactor内部必须使用水平触发而不是边缘触发。
这个类主要包装了TcpConnector的功能。
TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& name) : loop_(CHECK_NOTNULL(loop)), connector_(new Connector(loop, serverAddr)), name_(name), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), retry_(false), connect_(true), nextConnId_(1) { connector_->setNewConnectionCallback( boost::bind(&TcpClient::newConnection, this, _1)); // FIXME setConnectFailedCallback } |
TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg) : loop_(CHECK_NOTNULL(loop)), hostport_(listenAddr.toHostPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr)), threadPool_(new EventLoopThreadPool(loop)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), started_(false), nextConnId_(1) { acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); } |
一样是创建好acceptor这个对象而后设置好回调为TcpServer::newConnection,同时在外部设置好TcpConnection的各个回调。而后调用start来启动服务器,start 会调用acceptor::listen这个方法,一旦有链接创建的话那么会调用newConnection。下面是newConnection代码:
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_); ++nextConnId_; string connName = name_ + buf; // FIXME poll with zero timeout to double confirm the new connection TcpConnectionPtr conn( new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); } |
对于服务端来讲链接都被惟一化了而后映射为字符串放在connections_这个容器内部。threadPool_->getNextLoop()能够轮询地将取出每个线程而后将 TcpConnection::connectEstablished轮询地丢到每一个线程里面去完成。存放在connections_是有缘由了,每一个TcpConnection有惟一一个名字,这样Server 就能够根据TcpConnection来从本身内部移除连接了。在析构函数里面能够遍历connections_内容获得全部创建的链接而且逐一释放。