去年作了一个远程升级的服务。客户端链接此服务能够下载更新程序。简单点说就是个TCP sever。基于C++。nginx
运行环境是centOS 6.5。apache
刚开始客户端数量少并且访问不频繁,因此没太关注并发的问题。当时用工具测试大概只能支持的40/秒的并发访问,并且已经有数据串包的状况出现了。最近有空作了很多的优化并记录了笔记备忘。编程
下面给出的代码都不是完整的项目源码,我只是截取了关键部分用于说明主题安全
我选择的测试工具是一个tcp客户端工具,能够快捷的进行多客户端链接的测试。多线程
最初的版本是经过多线程实现高并发的。个人工程里有两个类是单例模式,一个参数文件管理类,一个是日志管理类。一开始我实现的时候没有考虑线程安全,因而第一步我先把这两个类改为线程安全测试看看效果。并发
增长线程安全前的代码片断(只给出参数文件管理类的实现)异步
//.h class AppCof { public: static AppCof* open_cof(); private: AppCof(); class CGarbo //它的惟一工做就是在析构函数中删除CSingleton的实例 { public: ~CGarbo() { if(AppCof::m_pInstance) delete AppCof::m_pInstance; } }; static CGarbo Garbo; //定义一个静态成员变量,程序结束时,系统会自动调用它的析构函数 ...
//.cpp AppCof* AppCof::open_cof(){ if(m_pInstance == NULL){ m_pInstance = new AppCof(); } return m_pInstance; } ...
增长线程安全后socket
//.h class AppCof:boost::noncopyable { public: static AppCof* open_cof(); private: AppCof(); static AppCof *m_pInstance; static void init(); static pthread_once_t ponce_; ...
void AppCof::init() { m_pInstance = new AppCof(); if(m_pInstance != NULL) { m_pInstance->get_env(); m_pInstance->read_cof(); } } AppCof* AppCof::open_cof(){ pthread_once(&ponce_, &AppCof::init); return m_pInstance; } ...
这里有两个重点,一是pthread_once的用法,还有就是boost::noncopyable。tcp
先说说前者,函数
int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));
本函数使用初值为PTHREAD_ONCE_INIT的once_control变量保证init_routine()函数在本进程执行序列中仅执行一次。在多线程编程环境下,尽管pthread_once()调用会出如今多个线程中,init_routine()函数仅执行一次,究竟在哪一个线程中执行是不定的,是由内核调度来决定。
boost::noncopyable这种用法其实从名字能够窥探一二,一个类继承自它就表示该类不能经过赋值,复制等手段建立新的对象了。
这部分从select,epoll这些IO处理上下手。我用三个方案分别测试,效果仍是比较明显。
最初的版本是这样的:
void run_srv(const char* i_port){ for(;;){ client = server.accept_client(); std::thread t1(base_proc,client,i_port); usleep(200); t1.detach(); } } void base_proc(Socket::TCP* i_client,const char* i_port){ Socket::TCP* client = i_client; i_client = NULL; TmsProc* tmpc =new TmsProc(client); tmpc->run(); delete tmpc; return ; }
void TmsProc::run(){ Writelog::Trace(9,"业务处理开始"); try{ for(;;){ tm.tv_sec = 3; tm.tv_usec = 0; FD_ZERO(&set); FD_SET(p_client->_socket_id,&set); int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm); if(iret < 0){ Writelog::Trace(2,"select出错:%s",strerror(errno)); return; } if(iret == 0){ Writelog::Trace(2,"select超时"); return; } Writelog::Trace(9,"监控到能够进行接收"); //收取信息 if(read_sock() == false){ Writelog::Trace(3,"检测到客户端套接字异常,准备断开链接"); break; } ...
很简单,主要流程都在run函数里。这个函数能够优化的地方有几处。好比两个if的判断能够改为if elseif的形式。由于两次if虽然是互斥的可是程序都会判断一次,效率比较低。
另外接收数据的条件能够用FD_ISSET判断是否有数据可读,若是有才真正接收,不然不处理。
因此第一种优化方案很快出炉
void TmsProc::run(){ tm.tv_sec = 60; tm.tv_usec = 0; try{ for(;;){ FD_ZERO(&set); FD_SET(p_client->_socket_id,&set); int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm); if(iret < 0){ pLog_tmsProc->Trace(2,"select出错:%s",strerror(errno)); return; } else if(iret == 0){ pLog_tmsProc->Trace(2,"select超时"); return; } if(FD_ISSET(p_client->_socket_id,&set)) { pLog_tmsProc->Trace(9,"监控到能够进行接收"); if(read_sock() == false){ pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开链接"); break; } ...
注意到我把超时时间改为了60秒,
tm.tv_sec = 60;
这是我在实际测试时发现,当并发量大时,程序在处理数量多的链接时,前面分配成功的线程会超时退出,看下日志就明白了:
14:39:40][140579076663072]:准备accept [14:39:40][140579076663072]:接待并分配文件描述符[44],主服务描述符[3] [14:39:40][140579076663072]:接到链接请求,准备启动线程TCP:0x1e89e90,IP:10.0.0.106,PORT:19803 [14:39:40][140579076663072]:启动服务线程于140577928623872 [14:39:40][140579076663072]:等待接收客户端链接 [14:39:40][140579076663072]:准备accept [14:39:40][140579076663072]:接待并分配文件描述符[46],主服务描述符[3] [14:39:40][140579076663072]:接到链接请求,准备启动线程TCP:0x1e8a170,IP:10.0.0.106,PORT:19804 [14:39:40][140577928623872]:接到来自TMS端口的请求 [14:39:40][140577928623872]:业务处理开始 [14:39:40][140577918134016]:接到来自TMS端口的请求 [14:39:40][140577918134016]:业务处理开始 [14:39:40][140579076663072]:启动服务线程于140577918134016 [14:39:40][140579076663072]:等待接收客户端链接 [14:39:40][140579076663072]:准备accept [14:39:40][140579076663072]:接待并分配文件描述符[48],主服务描述符[3] [14:39:40][140579076663072]:接到链接请求,准备启动线程TCP:0x1e8a450,IP:10.0.0.106,PORT:19805 [14:39:40][140577500821248]:接到来自TMS端口的请求 [14:39:40][140577500821248]:业务处理开始 [14:39:40][140579076663072]:启动服务线程于140577500821248 [14:39:40][140579076663072]:等待接收客户端链接 [14:39:40][140579076663072]:准备accept [14:39:41][140579076663072]:接待并分配文件描述符[50],主服务描述符[3]
由于工具是模拟多个客户端同时发起请求,因而就有了上面这样的分配线程的过程,会持续的时间比较长(还要写日志),也就是同时发生的链接数越多,超时时间就要设置越长。超时改为60秒后。通过工具实测,500链接/500毫秒(应该至关于1000/秒的并发量)的处理都正常。
性能大大提升。
可是问题仍是很明显,就是超时时间。随着链接数的增大,超时也要一直增大才能保证没有线程"掉队",可是这个时间太大了会影响真正接收数据时的效率。
第二种优化方案思路来源于apache和nginx的性能差别。
apache和nginx他俩的一个重要区别是前者基于多线程实现高并发,然后者基于多进程(fork)。
而众所周知,nginx不少场景的高并发是好于apache的。
因此个人第二种方案,基本思路是为每一个链接fork一个单独的进程处理。独立进程有个最大的好处是不须要加锁了(不解释)。修改好的代码片断以下(我已经把全部带锁的地方都去掉了,这里不贴出来了)。
void run_srv(const char* i_port){ for(;;) { client = server.accept_client(); pWriteLogInstance->Trace(1,"接到链接请求,准备启动进程TCP:%p,IP:%s,PORT:%u",client,client->ip().c_str(),client->port()); if(client->_socket_id < 0) { if(errno == EINTR || errno == ECONNABORTED) continue; else { cout << "accept error" << endl; return; } } fpid = fork(); if(fpid < 0) { pWriteLogInstance->Trace(9,"fork error"); } else if(fpid > 0)//father { pWriteLogInstance->Trace(1,"father process start"); client->close(); } else //child { server.close(); pWriteLogInstance->Trace(1,"child process start"); TmsProc* tmpc =new TmsProc(client); tmpc->run(); delete tmpc; if(client != NULL) { client->close(); delete client; client = NULL; } exit(-6); } usleep(10);
void TmsProc::run(){ while(1) { //收取信息 if(read_sock() == false){ pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开链接"); send_info.is_bad_qry = true; break; } if(stc_tms.un_parse_size == 0) { pLog_tmsProc->Trace(3,"没有接受到有效数据,客户端关闭了"); break; } ....
测试结果跟我预想的差很少。效果也是不错的。一样是1000次/秒的并发量数据没有出现问题。并且相比较前一种方案,没有了超时时间的困扰。
第三种方案,我考虑试试 IO 处理中的王者,epoll。
epoll是Linux内核为处理大批量文件描述符而做了改进的poll,是Linux下多路复用IO接口select/poll的加强版本,它能显著提升程序在大量并发链接中只有少许活跃的状况下的系统CPU利用率。另外一点缘由就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就好了。
为了简单起见,我这里只是用了单线程的epoll,用循环来轮询客户端的socket id来处理多个客户端链接的状况。单线程的epoll号称也能处理1万以上的并发量,我要测试下是否是有这边牛X。
epoll方案的代码以下:
void TmsProc::run(){ struct epoll_event event; // 告诉内核要监听什么事件 struct epoll_event wait_event[OPEN_MAX]; //内核监听完的结果 Socket::TCP server; pLog_tmsProc->Trace(9,"准备监听端口:%d",this->port); server.listen_on_port(this->port,OPEN_MAX); Socket::TCP* client; //4.epoll相应参数准备 int fd[OPEN_MAX+1]; int i = 0, maxi = 0; int number = 0; memset(fd,-1, sizeof(fd)); fd[0] = server._socket_id; pLog_tmsProc->Trace(9,"epoll 开始准备"); int epfd = epoll_create(OPEN_MAX+1); if( -1 == epfd ) { pLog_tmsProc->Trace(9,"epoll create error"); return; } event.data.fd = server._socket_id; //监听套接字 event.events = EPOLLIN; // 表示对应的文件描述符能够读 //5.事件注册函数,将监听套接字描述符 sockfd 加入监听事件 int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, server._socket_id, &event); if(-1 == ret){ pLog_tmsProc->Trace(9,"epoll_ctl error"); return; } pLog_tmsProc->Trace(9,"业务处理开始"); while(1) { // 监视并等待多个文件(标准输入,udp套接字)描述符的属性变化(是否可读) // 没有属性变化,这个函数会阻塞,直到有变化才往下执行,这里没有设置超时 pLog_tmsProc->Trace(9,"epoll 开始监听"); number = epoll_wait(epfd, wait_event, OPEN_MAX, -1); for(int i = 0; i < number; i++) { if( (wait_event[i].events & EPOLLERR) || ( wait_event[i].events & EPOLLHUP ) || !(wait_event[i].events & EPOLLIN) ) { pLog_tmsProc->Trace(9,"epoll error"); close(wait_event[i].data.fd); continue; } else if(server._socket_id == wait_event[i].data.fd ) { while(1) { client = server.accept_client(); if(client->_socket_id == -1) { if( errno == EAGAIN || errno == EWOULDBLOCK ) { break; } else { pLog_tmsProc->Trace(9,"accept error"); break; } } Socket::TCP::make_socket_non_blocking(client->_socket_id); event.data.fd = client->_socket_id; //监听套接字 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; // 表示对应的文件描述符能够读 //6.1.3.事件注册函数,将监听套接字描述符 connfd 加入监听事件 pLog_tmsProc->Trace(9,"为客户端注册epoll监听"); ret = epoll_ctl(epfd, EPOLL_CTL_ADD, client->_socket_id, &event); if(ret < 0){ pLog_tmsProc->Trace(9,"epoll_ctl error"); } event.data.fd = client->_socket_id; } } else { //收取信息 if(read_sock(wait_event[i].data.fd) == false){ pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开链接"); close(wait_event[i].data.fd); } ...
只能说epoll确实比较给力,我这只是个单线程的服务,用工具测试上述代码,500个并发也是妥妥的。