BGCC源代码(一)

本文从总体上介绍下百度的通用通讯组件, 须要下载源码的同窗,请点这里 http://bgcc.baidu.com/node

第一部分:服务端逻辑数组

1.线程池,在服务启动时建立线程,socket

2.一个线程池对应一个同步的任务队列 ,函数

3.线程池中的每一个线程,初始时,都阻塞在任务队列, 等待唤醒,oop

4.主线程进入事件循环,监听事件post

5.遇到读事件,调用添加事件时指定的回调函数, 默认为DataCallback, 若是是链接请求,调用AcceptDataCallbackui

6.DataCallback解析包头和包体, 获得processor名称,  找到对应的processor, this

7.把processor、包体、序列化对象封装到成一个任务,push到同步队列中,并触发信号量 sem_postspa

8.线程池中的线程醒来, 调用processor处理task任务, 从包体中读出指望执行的函数名, 执行该函数(业务实现)线程

9.业务在该函数中实现功能后, 还需调用序列化类的writeMessageBegin, writeMessageEnd, 进而调用socket类中的write, 并最终调用send返回处理结果给客户端。

  这里没有使用EpollOut事件, 当发送缓冲区满时, 会返回-1,errno=EAGAIN,  此时bgcc直接返回失败了, 没有继续处理。  这里若是了注册epoll_out事件,就能够处理了。

10.从新注册事件

 

看一个服务端使用的例子

复制代码

1 Server* server;
 2 
 3 void* server_func(const bool* isstopped, void*) {
 4     SharedPointer<IProcessor> xp(
 5             new MathProcessor(
 6                 SharedPointer<Math>(
 7                     new MathImpl)));
 8 
 9     ServiceManager sm;
10     sm.add_service(xp);
11 
12     ThreadPool tp;
13     tp.init(10);
14 
15     server = new Server(&sm, &tp, 8321);
16     if (0 != server->serve()) {
17         return 0;
18     }
19     return NULL;
20 }
21 
22 int main(int argc, char* argv[]) {
23     log_open("server.conf");
24     Thread t(server_func);
25     t.start();
26 
27     return 0;
28 }

复制代码

例子中, Server是bgcc::EpollServer类。定义以下

复制代码

1 #ifndef _BGCC2_EPOLL_SERVER_H_
 2 #define _BGCC2_EPOLL_SERVER_H_
 3 
 4 #ifndef _WIN32
 5 
 6 #include "bgcc_common.h"
 7 #include "server.h"
 8 #include "event_poll.h"
 9 #include "mempool.h"
10 #include "service_manager.h"
11 #include "thread_pool.h"
12 #include "server_task.h"
13 
14 namespace bgcc {
15 
16     class EpollServer : public IServer {
17     public:
18         EpollServer(ServiceManager* service_manager,
19                 ThreadPool* thread_pool,
20                 uint16_t port,
21                 const std::string& node = "");
22 
23         virtual ~EpollServer() {
24         }
25 
26         virtual int32_t init();
27         virtual int32_t serve();
28         virtual int32_t stop();
29 
30         ServiceManager* get_service_manager();
31         ThreadPool* get_thread_pool();
32 
33         TaskAsso Tasks[MAXNFD];
34 
35     protected:
36         enum state_t {
37             S_UNINIT,
38             S_INIT,
39             S_SERVE,
40             S_STOPPED
41         };
42         int32_t socket_init();
43 
44         ServiceManager* _service_manager;
45         ThreadPool* _thread_pool;
46         uint16_t _port;
47         state_t _state;
48         int32_t _listenfd;
49         EventLoop _loop;
50         std::string _node;
51     };
52 
53     typedef EpollServer Server;
54 }
55 
56 #endif // _WIN32
57 
58 #endif // _BGCC2_EPOLL_SERVER_H_

复制代码

咱们主要关心EpollServer::serve(), 其实现以下

复制代码

1     int32_t EpollServer::serve() {
 2 
 3         int32_t ret;
 4         if (0 != (ret = init())) {
 5             return ret;
 6         }
 7 
 8         if (S_INIT != _state) {
 9             BGCC_NOTICE("bgcc", "Need to call `init' before `serve' on Instance of EpollServer\n");
10             return E_BGCC_SERVER_NEED_INIT;
11         }
12 
13         _listenfd = socket_init();
14         if (INVALID_SOCKET == _listenfd) {
15             return E_BGCC_SERVER_CREATE_LISTENFD_FAILED;
16         }
17 
18         Event e;
19         EventCallback::PrepareEvent(e, _listenfd, const_cast<EpollServer*>(this));
20         e.read_cb = EventCallback::AcceptCallback;
21         _loop.add_event(&e);
22 
23         BGCC_NOTICE("bgcc", "fd=%d Is Begin To Wait for accept new client On %s:%d", 
24                 _listenfd, (_node.empty()?"*":_node.c_str()), _port);
25         _state = S_SERVE;
26         
27         return _loop.loop();
28     }

复制代码

其中_loop类型是bgcc::EventLoop, 其定义以下

复制代码

1     /**
 2      * @brief 事件循环
 3      * @see
 4      * @note
 5      * @author  liuxupeng(liuxupeng@baidu.com)
 6      * @date    2012年06月14日 20时05分36秒
 7      */
 8     class EventLoop {
 9     public:
10         /**
11          * @brief EventLoop 构造函数
12          * @see
13          * @note
14          * @author  liuxupeng(liuxupeng@baidu.com)
15          * @date    2012年06月14日 20时19分50秒
16          */
17         EventLoop();
18 
19         /**
20          * @brief create 建立内部epoll
21          *
22          * @return 成功返回0
23          * @see
24          * @note
25          * @author  liuxupeng(liuxupeng@baidu.com)
26          * @date    2012年06月14日 20时19分59秒
27          */
28         int32_t create();
29         int32_t destroy();
30 
31         int32_t add_event(Event* event);
32         int32_t del_event(Event* event);
33 
34         int32_t loop();
35         int32_t unloop();
36         bool is_stopped() const;
37     private:
38         enum state_t {
39             S_UNINIT,
40             S_INIT,
41             S_LOOP,
42             S_STOP,
43             S_DESTROYED
44         };
45     private:
46         state_t _state;
47         volatile bool _stopped;
48         int32_t _epfd;
49         struct epoll_event _ep_events[MAXNFD];
50         Event _events[MAXNFD];
51     };
52 }

复制代码

其中loop()实现以下  

复制代码

1     int32_t EventLoop::loop() {
 2         if (S_INIT != _state) {
 3             return -1;
 4         }
 5         _state = S_LOOP;
 6         _stopped = false;
 7 
 8         while (!_stopped) {
 9             int32_t numevents;
10             while((numevents=epoll_wait(_epfd, _ep_events, MAXNFD, 200))==SOCKET_ERROR&&EINTR==errno);
11 
12             if (numevents > 0) {
13                 int j;
14 
15                 for (j = 0; j < numevents; j++) {
16                     struct epoll_event* e = _ep_events + j;
17                     int32_t fd = e->data.fd;
18 
19                     if (e->events & EPOLLIN) {
20                         if (_events[fd].read_cb)
21                             (_events[fd].read_cb)(this, fd, _events[fd].read_cb_arg);
22                     }
23                     if (e->events & EPOLLOUT) {
24                         if (_events[fd].write_cb)
25                             (_events[fd].write_cb)(this, fd, _events[fd].write_cb_arg);
26                     }
27                     if (e->events & EPOLLERR) {
28                         if (_events[fd].error_cb)
29                             (_events[fd].error_cb)(this, fd, _events[fd].error_cb_arg);
30                     }
31                 }
32             }
33         }
34         _state = S_STOP;
35         return 0;  

复制代码

 从实现上看,  事件循环使用了epoll_wait,EventLoop类中有一个事件数组成员_events[], 全部fd事件都保存在该数组中。 增删fd事件,也都须要操做该数组。

添加事件

复制代码

1     int32_t EventLoop::add_event(Event* event) {
 2         if (S_INIT != _state && S_LOOP != _state) {
 3             return -1;
 4         }
 5 
 6         if (NULL == event) {
 7             return 0;
 8         }
 9 
10         int32_t fd = event->fd;
11         uint32_t mask = event->mask;
12 
13         int32_t op;
14         if (EVENT_NONE == _events[fd].mask) {
15             op = EPOLL_CTL_ADD;
16         }
17         else {
18             op = EPOLL_CTL_MOD;
19             mask |= _events[fd].mask;
20         }
21         _events[fd].mask = mask;
22 
23         struct epoll_event ee;
24 
25         // To fix valgrind error: Syscall param epoll_ctl(event) points to uninitialised byte(s)
26         memset(&ee.data, 0, sizeof(ee.data));
27 
28         ee.data.fd = fd;
29         ee.events = 0;
30 
31         if (mask & EVENT_READ) {
32             ee.events |= EPOLLIN;
33             _events[fd].read_cb = event->read_cb;
34             _events[fd].read_cb_arg = event->read_cb_arg;
35         }
36 
37         if (mask & EVENT_WRITE) {
38             ee.events |= EPOLLOUT;
39             _events[fd].write_cb = event->write_cb;
40             _events[fd].write_cb_arg = event->write_cb_arg;
41         }
42 
43         if (mask & EVENT_ERROR) {
44             ee.events |= EPOLLERR;
45             _events[fd].error_cb = event->error_cb;
46             _events[fd].error_cb_arg = event->error_cb_arg;
47         }
48 
49         if(SocketTool::set_nonblock(fd, 1)!=0){
50             BGCC_WARN("bgcc", "Before Add fd=%d to Epoll Set To Nonblock Failed(%d)",
51                     fd, BgccGetLastError());
52             return -1;
53         }
54 
55         int32_t ret=epoll_ctl(_epfd, op, fd, &ee);
56         if(0!=ret&&EPOLL_CTL_ADD==op){
57             if(SocketTool::set_nonblock(fd, 0)!=0){
58                 BGCC_WARN("bgcc", "Add fd=%d to Epoll Failed Set To Block Failed(%d)",
59                     fd, BgccGetLastError());
60             }
61         }
62 
63         return ret;
64     }

复制代码

删除事件

复制代码

1    int32_t EventLoop::del_event(Event* event) {
 2         if (S_INIT != _state && S_LOOP != _state) {
 3             return -1;
 4         }
 5 
 6         if (NULL == event) {
 7             return 0;
 8         }
 9 
10         int32_t fd = event->fd;
11         uint32_t mask = _events[fd].mask & (~event->mask);
12         _events[fd].mask = mask;
13 
14         struct epoll_event ee;
15         ee.data.fd = fd;
16         ee.events = 0;
17         if (mask & EVENT_READ) ee.events |= EPOLLIN;
18         if (mask & EVENT_WRITE) ee.events |= EPOLLOUT;
19 
20         int32_t op;
21         if (mask != EVENT_NONE) {
22             op = EPOLL_CTL_MOD;
23         } else {
24             op = EPOLL_CTL_DEL;
25 //            _events[fd].Reset();
26         }
27 
28         int32_t ret=epoll_ctl(_epfd, op, fd, &ee);
29         if(EPOLL_CTL_DEL==op&&0==ret){
30             if(SocketTool::set_nonblock(fd, 0)!=0){
31                 BGCC_WARN("bgcc", "Del fd=%d From Epoll Set To Block Failed(%d)",
32                         fd, BgccGetLastError());
33             }
34         }
35         return ret;
36     }

复制代码

添加和删除事件,最终都是对epoll接口的封装,

接着看下_events[] 中的元素类型 Event, 其定义以下

复制代码

1    /**
 2      * @brief 封装事件及事件处理函数
 3      * @see
 4      * @note
 5      * @author  liuxupeng(liuxupeng@baidu.com)
 6      * @date    2012年06月14日 20时00分59秒
 7      */
 8     struct Event {
 9         /**
10          * @brief Event 事件类
11          * @see
12          * @note
13          * @author  liuxupeng(liuxupeng@baidu.com)
14          * @date    2012年06月14日 20时04分59秒
15          */
16         Event() {
17             Reset();
18             }
19 
20         void Reset(){
21             fd=INVALID_SOCKET;
22             mask=EVENT_NONE;
23             read_cb=NULL;
24             write_cb=NULL;
25             error_cb=NULL;    
26             read_cb_arg=NULL;
27             write_cb_arg=NULL;
28             error_cb_arg=NULL;    
29         }
30 
31         int32_t fd; /** 事件对应的fd*/
32         uint32_t mask;  /** 事件标识位*/
33         callback_func_t read_cb;    /** 读回调*/
34         callback_func_t write_cb;   /** 写回调*/
35         callback_func_t error_cb;   /** 错误回调*/
36         void* read_cb_arg;
37         void* write_cb_arg;
38         void* error_cb_arg;
39     };

复制代码

Event对象包含了事件fd、事件类型、以及回调函数、回调函数的参数, 这里有read_cb, write_cb, error_cb

回到EpollServer::serve(), 它调用了EpollServer::init()

相关文章
相关标签/搜索