muduo库源码分析1——ChargenServer服务器启动流程

ChargenServer服务器启动流程

        在使用TCPServer时,用户需要注册connectionCallback_,messageCallback_writeCompleteCallback_三个事件句柄,每当TCPServer中新构造一个新Acceptor对象。其构造函数会先初始化Channel成员对象acceptChannel_(loop, acceptSocket_.fd()),然后调用acceptChannel_.setReadCallback(boost::bind(&Acceptor::handleRead, this))Acceptor的成员函数绑定为其Channel成员的事件句柄,并在TCPServer::start()调用acceptChannel_.enableReading()loop登记轮询。并在接下来调用acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));TcpServer::newConnection赋值给Acceptor::newConnectionCallback_[newConnectionCallback_Acceptor::handleRead回调]

        当TCPServerAcceptor成员接收到一个新的连接请求后最终调用TcpServer::newConnectionCallback_函数,产生新的TcpConnection对象。在其构造函数中初始化成员Channel成员对象channel_(new Channel(loop, sockfd)),续而调用channel_->setReadCallbackchannel_->setWriteCallback等将TcpConnection的成员函数绑定为其Channel的事件回调,并在随后调用ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn))向loop登记轮询。最后将TcpServer::connectionCallback_\messageCallback_\writeCompleteCallback_赋值给TcpConnection对应函数对象[分别在Acceptor::handleWrite/handleRead回调]

        其中HttpProxy对象封装了http服务器的所有行为,用户需要实现onConnection,onMessageonWriteComplete三个回调函数,并在构造函数中将三个函数注册为TcpServer对象的事件句柄。此处看似违背了线程安全,在构造函数中暴露this指针,实际上由于设计时不考虑跨线程使用HttpProxy的实例,并且在代码是现实强行限制了对象使用的线程,所以不存在线程安全方面的隐患。启动Tcp服务器的时序图如图6-X所示:


图1 启动Tcp服务器的时序图错误:“登记事件轮训非函数返回

        Tcp服务启动实际上是线程的事件循环启动,网络服务器的事件循环实际上是不断在while循环体的开始使用epoll系统调用轮训监听的套接字,查看是否有事件发生,若果有事件,则调用注册的事件句柄处理,事件处理时序图如图6-X所示:


图2 Tcp事件处理时序图(错误:“发送数据非同步调用;epoll非函数返回,而是EventLoop的自调用

函数调用流程:

>EventLoop  loop; // 构造线程事件循环对象
| > poller_(Poller::newDefaultPoller(this))  // 初始化poller_,构造newDefaultPoller对象
| > wakeupChannel_(new Channel(this, wakeupFd_))  // 初始化wakeupChannel_,构造一个Channel对象
| >wakeupChannel_->setReadCallback(boost::bind(&EventLoop::handleRead, this));  // wakeupChannel_注册读事件回调
| >wakeupChannel_-> enableReading()  // Channel对象核心函数,将Channel对象在事件循环中登记,使Channel对象可读
| | >events_ |= kReadEvent;  // 设置事件
| | >update();  // 注册wakeupChannel_对象到Pollerstd::map<int, Channel*>容器
| | | >loop_->updateChannel(this);
| | | >poller_->updateChannel(channel);
| | | | // socket文件描述符添加struct pollfd数组,以便poll()函数遍历
| | | | >struct  pollfd pfd;
| | | | >pfd.fd = channel->fd();
| | | | | >pfd.events = static_cast<short>(channel->events());
| | | | | >pfd.revents = 0;
| | | | | >pollfds_.push_back(pfd);  //  pollfds_ std::vector<struct pollfd>数组
| | | | | // 
| | | | | >channels_[pfd.fd] = channel;  //  channel 对象放入std::map<int, Channel*>
| | | | | |
>ChargenServer  server(&loop, listenAddr, true);  //构造用户服务器对象
| >server_(loop, listenAddr, "ChargenServer")  // 初始化TCPServer server_对象
| | > acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))  // 初始化Acceptor acceptor_对象用于处理连接操作
| | | > acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family()))  // 初始化Socket acceptSocket_对象,创建非阻塞套接字
| | | > acceptChannel_(loop, acceptSocket_.fd())  // acceptSocket_绑定Channel对象,处理其IO操作
| | | | >loop_(loop)  //初始化loop_指针
| | | | >fd_(fd__)  // 初始化socket文件描述符
| | | //  绑定ip地址和端口
| | | >acceptSocket_.setReuseAddr(true);
| | | >acceptSocket_.setReusePort(reuseport);
| | | >acceptSocket_.bindAddress(listenAddr);
| | | // 
| | | >acceptChannel_. setReadCallback(boost::bind(&Acceptor:: handleRead, this));  // acceptChannel_绑定读事件回调
| | | | >readCallback_=cb  //设置Channel acceptChannel_对象读回调成员readCallback_
| | | | > 其中Acceptor:: handleRead函数
| | | | | >int connfd = acceptSocket_.accept(&peerAddr);  // 回调handleRead函数意味着,有新连接请求,则需要调用::accept
| | | | | > newConnectionCallback_(connfd, peerAddr);  // 调用Acceptor的连接请求回调
| | >connectionCallback_(defaultConnectionCallback)  //TCPServer server_对象绑定默认连接回调
| | >messageCallback_(defaultMessageCallback)  //TCPServer server_对象绑定默认消息回调
| | | > 其中defaultMessageCallback函数只负责写日志:
| |  >acceptor_->setNewConnectionCallback(boost::bind(&TcpServer:: newConnection, this, _1, _2));  //Acceptor acceptor_对象设置 连接回调newConnectionCallback_
| | | > 其中TcpServer:: newConnection函数处理新连接请求:
| | | | >TcpConnectionPtr  conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));  // 构造新连接对象
| | | | | >socket_(new Socket(sockfd)) / /初始化TCP连接的socket描述符
| | | | | >channel_(new Channel(loop, sockfd))  //socket绑定Channel对象
| | | | | >localAddr_(localAddr),peerAddr_(peerAddr)  // 记录连接两端的地址
| | | | | // 设置TcpConnection:: channel_ 对象的事件处理函数。可以看出,实际上 channel _调用事件回调的处理核心全部来自其父对象 TcpConnection ,使用boost::bind()原因是使回调函数不受普通成员函数的限制
| | | | | >channel_->setReadCallback(boost::bind(&TcpConnection:: handleRead, this, _1));
| | | | | | > 其中TcpConnection:: handleRead函数:
| | | | | | | > messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);  // messageCallback_ TcpConnection :: setMessageCallback ()绑定
| | | | | >channel_->setWriteCallback(boost::bind(&TcpConnection:: handleWrite, this));
| | | | | | > 其中TcpConnection:: handleWrite函数:
| | | | | | | >sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());  // socket可写时,调用sockets::write发送数据
| | | | | | | >loop_->queueInLoop(boost::bind( writeCompleteCallback_, shared_from_this()));  // 且在发送成功后调用发送完成回调
| | | | | >channel_->setCloseCallback(boost::bind(&TcpConnection:: handleClose, this));
| | | | | | > 其中,  TcpConnection:: handleClose函数:
| | | | | | | >connectionCallback_(guardThis);
| | | | | | | > closeCallback_(guardThis); ????
| | | | | >channel_->setErrorCallback(boost::bind(&TcpConnection:: handleError, this));
| | | | | | > 其中,  TcpConnection:: handleError负责写日志
| | | | | //
| | | | >connections_[connName] = conn;  // 新连接加入连接队列
| | | | // TcpConnection conn对象绑定连接,消息,发送,关闭回调
| | | | >conn->setConnectionCallback(connectionCallback_);  //设置TcpServer::connectionCallback_ ->  TcpConnection ::connectionCallback_
| | | | >conn->setMessageCallback(messageCallback_);  //设置TcpServer::messageCallback_ ->  TcpConnection ::messageCallback_
| | | | >conn->setWriteCompleteCallback(writeCompleteCallback_);  //设置TcpServer::writeCompleteCallback_ ->  TcpConnection ::writeCompleteCallback_
| | | | >conn->setCloseCallback(boost::bind(&TcpServer:: removeConnection, this, _1));  //设置TcpServer:: closeCallback _ ->  TcpConnection :: closeCallback _
| | | | | > 其中TcpServer:: removeConnection  ????
| | | | // 
| | | | >ioLoop-> runInLoop(boost::bind(&TcpConnection:: connectEstablished, conn));  //调用EventLoop::runInLoop(const Functor& cb)函数的目的是当使用线程池启动多个事件循环线程时,将connectEstablished函数投递到其他事件循环线程调用,分摊负载
| | | | | > 其中TcpConnection:: connectEstablished函数:
| | | | | | >channel_-> enableReading();  //loop登记新连接的channel对象
| | | | | | >connectionCallback_(shared_from_this());
| >server_.setConnectionCallback(boost::bind(&ChargenServer:: onConnection, this, _1));  //TcpServer注册connectionCallback_成员,连接相关(建立和关闭)事件句柄
| >server_.setMessageCallback(boost::bind(&ChargenServer:: onMessage, this, _1, _2, _3));  //TcpServer注册messageCallback_成员,可读事件句柄
| >server_.setWriteCompleteCallback(boost::bind(&ChargenServer:: onWriteComplete, this, _1));  //TcpServer注册writeCompleteCallback_成员,可写事件句柄
>server.start();  // 启动TCP服务器,实际上就是打开监听套接字
| >threadPool_->start(threadInitCallback_);  // 启动线程池,如果需要的话,主要取决于numThreads_。当numThreads_==0时,线程池不启动
| >loop_->runInLoop(boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
| | 其中Acceptor:: listen函数:
| | >acceptSocket_.listen();  // 开始监听
| | | >sockets::listenOrDie(sockfd_);
| | >acceptChannel_. enableReading();  //loop登记新连接的channel对象
>loop.loop();
| >while (!quit_){
| | >pollReturnTime_ = poller_->poll(kPollTimeMs, & activeChannels_);  // 调用poller对象pool方法,获得有事件发生的socket描述符对应Channel数组
| | >for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){  // 遍历被激活的Channel数组
| | | >currentActiveChannel_ = *it;
| | | >currentActiveChannel_->handleEvent(pollReturnTime_);  // 调用处理事件的句柄
| | | | >handleEventWithGuard(receiveTime);
| | | | | >if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
| | | | | | > closeCallback_();
| | | | | >if (revents_ & POLLNVAL)
| | | | | >if (revents_ & (POLLERR | POLLNVAL))
| | | | | | > errorCallback_();
| | | | | >if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
| | | | | | > readCallback_(receiveTime);
| | | | | >if (revents_ & POLLOUT)
| | | | | | > writeCallback_();
| | }
| }