本文从总体上介绍下百度的通用通讯组件, 须要下载源码的同窗,请点这里 http://bgcc.baidu.com/node
第一部分:服务端逻辑数组
1.线程池,在服务启动时建立线程,socket
2.一个线程池对应一个同步的任务队列 ,函数
3.线程池中的每一个线程,初始时,都阻塞在任务队列, 等待唤醒,oop
4.主线程进入事件循环,监听事件post
5.遇到读事件,调用添加事件时指定的回调函数, 默认为DataCallback, 若是是链接请求,调用AcceptDataCallbackui
6.DataCallback解析包头和包体, 获得processor名称, 找到对应的processor, this
7.把processor、包体、序列化对象封装到成一个任务,push到同步队列中,并触发信号量 sem_postspa
8.线程池中的线程醒来, 调用processor处理task任务, 从包体中读出指望执行的函数名, 执行该函数(业务实现)线程
9.业务在该函数中实现功能后, 还需调用序列化类的writeMessageBegin, writeMessageEnd, 进而调用socket类中的write, 并最终调用send返回处理结果给客户端。
这里没有使用EpollOut事件, 当发送缓冲区满时, 会返回-1,errno=EAGAIN, 此时bgcc直接返回失败了, 没有继续处理。 这里若是了注册epoll_out事件,就能够处理了。
10.从新注册事件
看一个服务端使用的例子
1 Server* server; 2 3 void* server_func(const bool* isstopped, void*) { 4 SharedPointer<IProcessor> xp( 5 new MathProcessor( 6 SharedPointer<Math>( 7 new MathImpl))); 8 9 ServiceManager sm; 10 sm.add_service(xp); 11 12 ThreadPool tp; 13 tp.init(10); 14 15 server = new Server(&sm, &tp, 8321); 16 if (0 != server->serve()) { 17 return 0; 18 } 19 return NULL; 20 } 21 22 int main(int argc, char* argv[]) { 23 log_open("server.conf"); 24 Thread t(server_func); 25 t.start(); 26 27 return 0; 28 }
例子中, Server是bgcc::EpollServer类。定义以下
1 #ifndef _BGCC2_EPOLL_SERVER_H_ 2 #define _BGCC2_EPOLL_SERVER_H_ 3 4 #ifndef _WIN32 5 6 #include "bgcc_common.h" 7 #include "server.h" 8 #include "event_poll.h" 9 #include "mempool.h" 10 #include "service_manager.h" 11 #include "thread_pool.h" 12 #include "server_task.h" 13 14 namespace bgcc { 15 16 class EpollServer : public IServer { 17 public: 18 EpollServer(ServiceManager* service_manager, 19 ThreadPool* thread_pool, 20 uint16_t port, 21 const std::string& node = ""); 22 23 virtual ~EpollServer() { 24 } 25 26 virtual int32_t init(); 27 virtual int32_t serve(); 28 virtual int32_t stop(); 29 30 ServiceManager* get_service_manager(); 31 ThreadPool* get_thread_pool(); 32 33 TaskAsso Tasks[MAXNFD]; 34 35 protected: 36 enum state_t { 37 S_UNINIT, 38 S_INIT, 39 S_SERVE, 40 S_STOPPED 41 }; 42 int32_t socket_init(); 43 44 ServiceManager* _service_manager; 45 ThreadPool* _thread_pool; 46 uint16_t _port; 47 state_t _state; 48 int32_t _listenfd; 49 EventLoop _loop; 50 std::string _node; 51 }; 52 53 typedef EpollServer Server; 54 } 55 56 #endif // _WIN32 57 58 #endif // _BGCC2_EPOLL_SERVER_H_
咱们主要关心EpollServer::serve(), 其实现以下
1 int32_t EpollServer::serve() { 2 3 int32_t ret; 4 if (0 != (ret = init())) { 5 return ret; 6 } 7 8 if (S_INIT != _state) { 9 BGCC_NOTICE("bgcc", "Need to call `init' before `serve' on Instance of EpollServer\n"); 10 return E_BGCC_SERVER_NEED_INIT; 11 } 12 13 _listenfd = socket_init(); 14 if (INVALID_SOCKET == _listenfd) { 15 return E_BGCC_SERVER_CREATE_LISTENFD_FAILED; 16 } 17 18 Event e; 19 EventCallback::PrepareEvent(e, _listenfd, const_cast<EpollServer*>(this)); 20 e.read_cb = EventCallback::AcceptCallback; 21 _loop.add_event(&e); 22 23 BGCC_NOTICE("bgcc", "fd=%d Is Begin To Wait for accept new client On %s:%d", 24 _listenfd, (_node.empty()?"*":_node.c_str()), _port); 25 _state = S_SERVE; 26 27 return _loop.loop(); 28 }
其中_loop类型是bgcc::EventLoop, 其定义以下
1 /** 2 * @brief 事件循环 3 * @see 4 * @note 5 * @author liuxupeng(liuxupeng@baidu.com) 6 * @date 2012年06月14日 20时05分36秒 7 */ 8 class EventLoop { 9 public: 10 /** 11 * @brief EventLoop 构造函数 12 * @see 13 * @note 14 * @author liuxupeng(liuxupeng@baidu.com) 15 * @date 2012年06月14日 20时19分50秒 16 */ 17 EventLoop(); 18 19 /** 20 * @brief create 建立内部epoll 21 * 22 * @return 成功返回0 23 * @see 24 * @note 25 * @author liuxupeng(liuxupeng@baidu.com) 26 * @date 2012年06月14日 20时19分59秒 27 */ 28 int32_t create(); 29 int32_t destroy(); 30 31 int32_t add_event(Event* event); 32 int32_t del_event(Event* event); 33 34 int32_t loop(); 35 int32_t unloop(); 36 bool is_stopped() const; 37 private: 38 enum state_t { 39 S_UNINIT, 40 S_INIT, 41 S_LOOP, 42 S_STOP, 43 S_DESTROYED 44 }; 45 private: 46 state_t _state; 47 volatile bool _stopped; 48 int32_t _epfd; 49 struct epoll_event _ep_events[MAXNFD]; 50 Event _events[MAXNFD]; 51 }; 52 }
其中loop()实现以下
1 int32_t EventLoop::loop() { 2 if (S_INIT != _state) { 3 return -1; 4 } 5 _state = S_LOOP; 6 _stopped = false; 7 8 while (!_stopped) { 9 int32_t numevents; 10 while((numevents=epoll_wait(_epfd, _ep_events, MAXNFD, 200))==SOCKET_ERROR&&EINTR==errno); 11 12 if (numevents > 0) { 13 int j; 14 15 for (j = 0; j < numevents; j++) { 16 struct epoll_event* e = _ep_events + j; 17 int32_t fd = e->data.fd; 18 19 if (e->events & EPOLLIN) { 20 if (_events[fd].read_cb) 21 (_events[fd].read_cb)(this, fd, _events[fd].read_cb_arg); 22 } 23 if (e->events & EPOLLOUT) { 24 if (_events[fd].write_cb) 25 (_events[fd].write_cb)(this, fd, _events[fd].write_cb_arg); 26 } 27 if (e->events & EPOLLERR) { 28 if (_events[fd].error_cb) 29 (_events[fd].error_cb)(this, fd, _events[fd].error_cb_arg); 30 } 31 } 32 } 33 } 34 _state = S_STOP; 35 return 0;
从实现上看, 事件循环使用了epoll_wait,EventLoop类中有一个事件数组成员_events[], 全部fd事件都保存在该数组中。 增删fd事件,也都须要操做该数组。
添加事件
1 int32_t EventLoop::add_event(Event* event) { 2 if (S_INIT != _state && S_LOOP != _state) { 3 return -1; 4 } 5 6 if (NULL == event) { 7 return 0; 8 } 9 10 int32_t fd = event->fd; 11 uint32_t mask = event->mask; 12 13 int32_t op; 14 if (EVENT_NONE == _events[fd].mask) { 15 op = EPOLL_CTL_ADD; 16 } 17 else { 18 op = EPOLL_CTL_MOD; 19 mask |= _events[fd].mask; 20 } 21 _events[fd].mask = mask; 22 23 struct epoll_event ee; 24 25 // To fix valgrind error: Syscall param epoll_ctl(event) points to uninitialised byte(s) 26 memset(&ee.data, 0, sizeof(ee.data)); 27 28 ee.data.fd = fd; 29 ee.events = 0; 30 31 if (mask & EVENT_READ) { 32 ee.events |= EPOLLIN; 33 _events[fd].read_cb = event->read_cb; 34 _events[fd].read_cb_arg = event->read_cb_arg; 35 } 36 37 if (mask & EVENT_WRITE) { 38 ee.events |= EPOLLOUT; 39 _events[fd].write_cb = event->write_cb; 40 _events[fd].write_cb_arg = event->write_cb_arg; 41 } 42 43 if (mask & EVENT_ERROR) { 44 ee.events |= EPOLLERR; 45 _events[fd].error_cb = event->error_cb; 46 _events[fd].error_cb_arg = event->error_cb_arg; 47 } 48 49 if(SocketTool::set_nonblock(fd, 1)!=0){ 50 BGCC_WARN("bgcc", "Before Add fd=%d to Epoll Set To Nonblock Failed(%d)", 51 fd, BgccGetLastError()); 52 return -1; 53 } 54 55 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 56 if(0!=ret&&EPOLL_CTL_ADD==op){ 57 if(SocketTool::set_nonblock(fd, 0)!=0){ 58 BGCC_WARN("bgcc", "Add fd=%d to Epoll Failed Set To Block Failed(%d)", 59 fd, BgccGetLastError()); 60 } 61 } 62 63 return ret; 64 }
删除事件
1 int32_t EventLoop::del_event(Event* event) { 2 if (S_INIT != _state && S_LOOP != _state) { 3 return -1; 4 } 5 6 if (NULL == event) { 7 return 0; 8 } 9 10 int32_t fd = event->fd; 11 uint32_t mask = _events[fd].mask & (~event->mask); 12 _events[fd].mask = mask; 13 14 struct epoll_event ee; 15 ee.data.fd = fd; 16 ee.events = 0; 17 if (mask & EVENT_READ) ee.events |= EPOLLIN; 18 if (mask & EVENT_WRITE) ee.events |= EPOLLOUT; 19 20 int32_t op; 21 if (mask != EVENT_NONE) { 22 op = EPOLL_CTL_MOD; 23 } else { 24 op = EPOLL_CTL_DEL; 25 // _events[fd].Reset(); 26 } 27 28 int32_t ret=epoll_ctl(_epfd, op, fd, &ee); 29 if(EPOLL_CTL_DEL==op&&0==ret){ 30 if(SocketTool::set_nonblock(fd, 0)!=0){ 31 BGCC_WARN("bgcc", "Del fd=%d From Epoll Set To Block Failed(%d)", 32 fd, BgccGetLastError()); 33 } 34 } 35 return ret; 36 }
添加和删除事件,最终都是对epoll接口的封装,
接着看下_events[] 中的元素类型 Event, 其定义以下
1 /** 2 * @brief 封装事件及事件处理函数 3 * @see 4 * @note 5 * @author liuxupeng(liuxupeng@baidu.com) 6 * @date 2012年06月14日 20时00分59秒 7 */ 8 struct Event { 9 /** 10 * @brief Event 事件类 11 * @see 12 * @note 13 * @author liuxupeng(liuxupeng@baidu.com) 14 * @date 2012年06月14日 20时04分59秒 15 */ 16 Event() { 17 Reset(); 18 } 19 20 void Reset(){ 21 fd=INVALID_SOCKET; 22 mask=EVENT_NONE; 23 read_cb=NULL; 24 write_cb=NULL; 25 error_cb=NULL; 26 read_cb_arg=NULL; 27 write_cb_arg=NULL; 28 error_cb_arg=NULL; 29 } 30 31 int32_t fd; /** 事件对应的fd*/ 32 uint32_t mask; /** 事件标识位*/ 33 callback_func_t read_cb; /** 读回调*/ 34 callback_func_t write_cb; /** 写回调*/ 35 callback_func_t error_cb; /** 错误回调*/ 36 void* read_cb_arg; 37 void* write_cb_arg; 38 void* error_cb_arg; 39 };
Event对象包含了事件fd、事件类型、以及回调函数、回调函数的参数, 这里有read_cb, write_cb, error_cb
回到EpollServer::serve(), 它调用了EpollServer::init()