本文是本身学习经验总结,有不正确的地方,请批评指正。html
总结一下这一段时间来,有关网络编程的学习。我是从csapp的最后章节的Tiny HTTP服务器开始,以它为基础,改用不一样的方式实现并发,包括进程、线程、线程池、I/O多路复用。全部代码见地址:https://github.com/xibaohe/tiny_serverlinux
关于进程和线程的网络编程模型,在UNP卷1的第30章,有详细的介绍。我这里,在Tiny基础上,实现了如下几种:git
其中,fdbuffer是指主线程accept获得已链接描述符后,存放进fdbuffer缓冲区,其余线程再去处理。github
Signal(SIGPIPE,SIG_IGN);//忽略SIGPIPE,见UNP 5.13 Signal(SIGCHLD,sigchld_hander);//回收子进程 listenfd = Open_listenfd(port);//见csapp相关章节 while (1) { connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); if(Fork() == 0){//the children process Close(listenfd);//子进程关闭监听描述符 doit(connfd);//子进程处理fd Close(connfd); exit(0); } Close(connfd); } void sigchld_hander(int sig) { while(waitpid(-1,0,WNOHANG) > 0) { if(verbose) printf("a child process gone!!\n"); } return; }
上述代码是最简单的并发模型,每个链接,都会新fork一个进程去处理,显然这种方式并发程度低。对于初学者,仍是有几个须要注意的地方。编程
注:doit函数来自于csapp的Tiny服务器,我添加了对HEAD、POST方法的简单支持,详细请参考所有源码。segmentfault
while (1) { connfd = Malloc(sizeof(int));//avoid race condition *connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); Pthread_create(&tid,NULL,&thread,connfd); } void *thread(void *vargp) { int connfd = *((int *)vargp); Pthread_detach(pthread_self()); Free(vargp); doit(connfd); Close(connfd); return NULL; }
多线程与多进程基本一致,须要注意的地方:数组
为每个客户都建立一个新的线程,显然不是高效的作法,咱们能够预先建立线程,主线程和其它线程经过一个缓冲区传递描述符,或者能够每一个线程本身accept。服务器
int i; for(i=0;i<NTHREADS;i++)/*create worker threads*/ Pthread_create(&tid,NULL,thread,NULL); while (1) { connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); sbuf_insert(&sbuf,connfd); } void *thread(void *vargp) { Pthread_detach(pthread_self()); while(1) { int connfd = sbuf_remove(&sbuf); doit(connfd); Close(connfd); } }
首先建立固定数量的线程,主线程将已链接描述符放如缓冲区中,其它线程再从缓冲区中取出fd,并处理。这是一个典型的生产者和消费者问题,在这个版本中,采用csapp中的信号量来解决同步问题,缓冲区同步的实现见csapp相关章节。网络
void sbuf_insert(sbuf_t *sp, int item) { /*write to the buffer*/ Pthead_mutex_lock(&sp->buf_mutex); if(sp->nslots == 0) { Pthread_mutex_unlock(&sp->buf_mutex); return ; } sp->buf[(++sp->rear)%(sp->n)] = item; sp->nslots--; Pthread_mutex_unlock(&sp->buf_mutex); int dosignal = 0; Pthread_mutex_lock(&sp->nready_mutex); if(sp->nready == 0) dosignal = 1; sp->nready++; Pthread_mutex_unlock(&sp->nready_mutex) if(dosignal) Pthread_cond_signal(&sp->cond); } int sbuf_remove(sbuf_t *sp) { int item; Pthread_mutex_lock(&sp->nready_mutex); while(sp->nready == 0) Pthread_cond_wait(&sp->cond,&sp->nready_mutex); item = sp->buf[(++sp->front) % (sp->n)]; Pthread_mutex_unlock(&sp->nready_mutex); if(item == 0)fprintf(stderr, "error!!!!fd item%d\n", item); return item; }
这个版本,主函数与版本3一致,缓冲区的同步我改用了互斥锁和条件变量。这里贴出sbuf insert和remove操做的实现。其中sbuf_t结构体中,nready和nslots分别指准备好待消费的描述符和缓冲区剩余的空位。多线程
在这里,为何在须要两个同步变量nready和nslots对应两个互斥锁?任意使用其中一个,当nslots小于n,或者nready大于零的时候,唤醒等待在条件变量上的线程,这样只需用一个同步变量,详见源码。我本身测试了一下,两种方式效率是差很少的。
个人理解是,当使用两个同步变量时,生产者在放入产品的时候,不阻塞消费者消费其余产品,由于没有对nready加锁,因此若是第一个阶段(放入产品)耗时比较多时,用两个同步变量更合适一些。而这里,放入产品并非耗时操做,所以效率差很少。
还有一个须要注意的地方是,我把Pthread_cond_signal放到了mutex外面,是为了不上锁冲突,见UNP卷2 7.5。
int i; for(i=0;i<NTHREADS;i++)/*create worker threads*/ Pthread_create(&tid,NULL,thread,NULL); while (1) { pause(); } } /* $end tinymain */ void *thread(void *vargp) { Pthread_detach(pthread_self()); int connfd; struct sockaddr_in clientaddr; int clientlen = sizeof(clientaddr); while(1) { Pthread_mutex_lock(&thead_lock); connfd = Accept(listenfd, (SA *)&clientaddr,&clientlen); Pthread_mutex_unlock(&thead_lock); doit(connfd); Close(connfd); } }
这个版本是预先建立固定数量的,可是由线程各自去accept, 对accept上锁保护。这种方式显然代码实现上容易得多,在效率上,因为只用到了Pthread_mutex_lock这一种系统调用,效率应该要稍微好一点。
以上就是我实现过的基于进程、线程,主要是线程的并发模式。进程另外还有几种模式,我认为那几种模式和线程基本一致,代码写起来也比较相似。实际上,在Linux下,线程其实就是资源共享的进程,都有本身的task_struct结构(《Linux内核设计与实现》)。
在这一部分,主要介绍linux下面,select、poll和epoll的用法和示例。一共下面三个程序:
typedef struct { int maxfd; fd_set read_set; fd_set ready_set; int nready; int maxi; int clientfd[FD_SETSIZE]; } pool; static pool client_pool; init_pool(listenfd,&client_pool); while(1) { client_pool.ready_set = client_pool.read_set; while((client_pool.nready = Select(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL)) < 0) { if(errno == EINTR) printf("got a signal restart pselect!!! \n"); } /*mask SIGCHLD!!!!! but some signal will be abondoned */ //client_pool.nready = Pselect(client_pool.maxfd+1,&client_pool.ready_set,NULL,NULL,NULL,&sig_chld); if(FD_ISSET(listenfd,&client_pool.ready_set)){ connfd = Accept(listenfd,(SA *)&clientaddr,&clientlen); add_client(connfd,&client_pool); } check_clients(&client_pool); }
第一个版本,来自于csapp。在pool结构体中clientfd保存全部已链接的fd,read_set是须要select去检测的fd集,ready是select返回已经准备好的fd集。须要注意的地方有:
再看一下这个client_pool的实现:
void init_pool(int listenfd, pool *p) { int i; p->maxi = -1; for (i = 0; i < FD_SETSIZE; i++) p->clientfd[i] = -1; p->maxfd = listenfd; FD_ZERO(&(p->read_set)); FD_SET(listenfd, &p->read_set); } void add_client(int connfd, pool *p) { int i; p->nready--; for (i = 0; i < FD_SETSIZE; i++) { if (p->clientfd[i] < 0) { p->clientfd[i] = connfd; FD_SET(connfd, &p->read_set); if (connfd > p->maxfd) p->maxfd = connfd; if (i > p->maxi) p->maxi = i; break; } } if (i == FD_SETSIZE) app_error("add_client error: Too many clients"); } void check_clients(pool *p) { int i, connfd, n; for (i = 0; (i <= p->maxi) && (p->nready > 0); i++) { connfd = p->clientfd[i]; if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) { p->nready--; doit(connfd); Close(connfd); FD_CLR(connfd, &p->read_set); p->clientfd[i] = -1; } } }
init_pool初始化,最开始的fd_set里面只有listenfd,add_client将已链接描述符添加到clientfd,并将read_set置位。check_clients是循环依次检查是哪个已链接fd,处理完毕后,将fd从clientfd和read_set中移除。
从上面的过程当中,能够看出select有几个明显的缺点:
struct pollfd{ int fd; //fd to check short events; //events of interest on fd short revents; //events that occurred on fd } typedef struct { struct pollfd client[OPEN_MAX]; int maxi; int nready; } pool; static pool client_pool; init_pool(listenfd, &client_pool); while (1) { while ((client_pool.nready = Poll(client_pool.client, client_pool.maxi + 1, INFTIM)) < 0) { if (errno == EINTR) printf("got a signal restart poll!!! \n"); } if (client_pool.client[0].revents & POLLRDNORM) { connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); add_client(connfd, &client_pool); } check_clients(&client_pool); }
poll的代码,基本与select模式一致。poll不一样的地方在于它使用pollfd结构来表示fdset,而不是位图。没有大小限制,使用起来更为方便。events和revents分别表示须要检测的事件和发生的事件。poll传入的client指针,底层应该是全部的pollfd构成的一个链表。
poll和select的缺点同样,都须要拷贝和轮询,随着fd数量的增大,效率都会大大下降。
typedef struct request_buffer{ int fd;/*fd for the connection */ int epfd;/* fd for epoll */ char buf[MAXBUF]; /*the buffer for the current request*/ size_t pos,last; }request_b struct epoll_event event; // event to register event.data.ptr = (void *)request; event.events = EPOLLIN | EPOLLET; Epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event); while (1) { int n; while ((n = Epoll_wait(epfd, events, MAXEVENTS, -1)) < 0) { if (errno == EINTR) printf("got a signal restart\n"); } for (int i = 0; i < n; i++) { request_b *rb = (request_b *)events[i].data.ptr; // int fd = rb->fd; if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { fprintf(stderr, "epoll error fd %d", rb->fd); Close(rb->fd); continue; } else if (rb->fd == listenfd) { /* the new connection incoming */ int infd; while (1) { infd = accept(listenfd, (SA *)&clientaddr, &clientlen); if (infd < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /*we have processed all incoming connections*/ break; } else { unix_error("accept error!"); break; } } make_socket_non_blocking(infd); if (verbose) printf("the new connection fd :%d\n", infd); request_b *request = (request_b *)Malloc(sizeof(request_b)); request_init(request, infd, epfd); event.data.ptr = (void *)request; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; Epoll_ctl(epfd, EPOLL_CTL_ADD, infd, &event); } } else { if (verbose) printf("new data from fd %d\n", rb->fd); doit_nonblock((request_b *)events[i].data.ptr); /* where to close the fd and release the requst!!! */ } } }
这段代码是典型的non-blocking IO + IO multiplexing的模式,因为是non-blocking IO,就不能用前面csapp提供的Rio包,由于要处理数据分包到达的状况。能够注意到结构体request_b就是是用来记录当前fd对应的请求状态,buf是当前请求的缓冲区。doit_nonblock和前面的doit函数有了很大的变化,这里我就不展开了,能够看个人代码,待完善。
epoll有ET和LT两种模式,详细定义见man手册。ET模式可以减小事件触发的次数,可是代码复杂度会增长,IO操做必需要等到EAGAIN,容易漏掉事件。而LT模式,事件触发的次数多一些,代码实现上要简单一点,不容易出错。目前没有明确的结论哪一种模式更高效,应该是看具体的场景。我这里使用了ET模式,在个人doit_nonblock函数里面,对于请求结束的判断还有错误,不可以正确判断一个完整的HTTP请求。
前面说到select的缺点,epoll是怎么解决的呢?
所以,当存在有大量的链接,可是活跃链接数量较少的状况下,epoll是十分高效的。更多的细节,能够参考内核源码
在实际使用中,确定不会单纯的用上面的某一种模式,而是多进程+IO multiplexing 或者 多线程 + IO multiplexing。后面再结合具体的例子,我会再继续研究。本来,我写了一个小的测试程序,可是发现个人测试方法不是十分合理,没有太大意义,就没有放出来了,有兴趣的能够看一看源码里面。
我是从读csapp后半部分开始,集中学习了一下网络编程的内容,这些内容十分基础,也许会对初学者有一些帮助。后续,我还会继续深刻,准备阅读陈硕的muduo,本身再动手写一些代码。
参考连接: